2025-01-20 14:33:03 +05:00

258 lines
8.8 KiB
Python

import logging
from django.utils.translation import gettext_lazy as _
from django.contrib.auth import get_user_model
from django.core.exceptions import PermissionDenied
from django.core.validators import RegexValidator
from rest_framework import serializers
from djangopasswordlessknox.models import CallbackToken
from djangopasswordlessknox.settings import api_settings
from djangopasswordlessknox.utils import authenticate_by_token, verify_user_alias, validate_token_age
logger = logging.getLogger(__name__)
User = get_user_model()
class TokenField(serializers.CharField):
default_error_messages = {
'required': _('Invalid Token'),
'invalid': _('Invalid Token'),
'blank': _('Invalid Token'),
'max_length': _('Tokens are {max_length} digits long.'),
'min_length': _('Tokens are {min_length} digits long.')
}
class AbstractBaseAliasAuthenticationSerializer(serializers.Serializer):
"""
Abstract class that returns a callback token based on the field given
Returns a token if valid, None or a message if not.
"""
@property
def alias_type(self):
# The alias type, either email or mobile
raise NotImplementedError
def validate(self, attrs):
alias = attrs.get(self.alias_type)
print("ALIAS", alias)
print("ALIAS TYPE", self.alias_type)
if alias:
# Create or authenticate a user
# Return THem
if api_settings.PASSWORDLESS_REGISTER_NEW_USERS is True:
# If new aliases should register new users.
try:
user = User.objects.get(**{self.alias_type: alias})
user_created = False
except User.DoesNotExist:
# If no user is found, raise an error
msg = ""
if self.alias_type == 'email':
msg = _('No user found with this email.')
elif self.alias_type == 'mobile':
msg = _('No user found with this mobile number.')
raise serializers.ValidationError(msg)
else:
# If new aliases should not register new users.
try:
user = User.objects.get(**{self.alias_type: alias})
except User.DoesNotExist:
user = None
if user:
if not user.is_active:
# If valid, return attrs so we can create a token in our logic controller
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
else:
msg = _('No account is associated with this alias.')
raise serializers.ValidationError(msg)
else:
msg = _('Missing %s.') % self.alias_type
raise serializers.ValidationError(msg)
attrs['user'] = user
return attrs
class EmailAuthSerializer(AbstractBaseAliasAuthenticationSerializer):
@property
def alias_type(self):
return 'email'
email = serializers.EmailField()
class MobileAuthSerializer(AbstractBaseAliasAuthenticationSerializer):
@property
def alias_type(self):
return 'mobile'
phone_regex = RegexValidator(regex=r'^[7|9][0-9]{6}$',
message="Mobile number must be entered in the format:"
" '7xxxxxx' or '9xxxxxx'.")
mobile = serializers.CharField(validators=[phone_regex], max_length=15)
"""
Verification
"""
class AbstractBaseAliasVerificationSerializer(serializers.Serializer):
"""
Abstract class that returns a callback token based on the field given
Returns a token if valid, None or a message if not.
"""
@property
def alias_type(self):
# The alias type, either email or mobile
raise NotImplementedError
def validate(self, attrs):
msg = _('There was a problem with your request.')
if self.alias_type:
# Get request.user
# Get their specified valid endpoint
# Validate
request = self.context["request"]
if request and hasattr(request, "user"):
user = request.user
if user:
if not user.is_active:
# If valid, return attrs so we can create a token in our logic controller
msg = _('User account is disabled.')
else:
if hasattr(user, self.alias_type):
# Has the appropriate alias type
attrs['user'] = user
return attrs
else:
msg = _('This user doesn\'t have an %s.' % self.alias_type)
raise serializers.ValidationError(msg)
else:
msg = _('Missing %s.') % self.alias_type
raise serializers.ValidationError(msg)
class EmailVerificationSerializer(AbstractBaseAliasVerificationSerializer):
@property
def alias_type(self):
return 'email'
class MobileVerificationSerializer(AbstractBaseAliasVerificationSerializer):
@property
def alias_type(self):
return 'mobile'
"""
Callback Token
"""
def token_age_validator(value):
"""
Check token age
Makes sure a token is within the proper expiration datetime window.
"""
valid_token = validate_token_age(value)
if not valid_token:
raise serializers.ValidationError("The token you entered isn't valid.")
return value
class AbstractBaseCallbackTokenSerializer(serializers.Serializer):
"""
Abstract class inspired by DRF's own token serializer.
Returns a user if valid, None or a message if not.
"""
token = TokenField(min_length=6, max_length=6, validators=[token_age_validator])
class CallbackTokenAuthSerializer(AbstractBaseCallbackTokenSerializer):
def validate(self, attrs):
callback_token = attrs.get('token', None)
token = CallbackToken.objects.get(key=callback_token, is_active=True)
if token:
# Check the token type for our uni-auth method.
# authenticates and checks the expiry of the callback token.
user = authenticate_by_token(token)
if user:
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
if api_settings.PASSWORDLESS_USER_MARK_EMAIL_VERIFIED \
or api_settings.PASSWORDLESS_USER_MARK_MOBILE_VERIFIED:
# Mark this alias as verified
user = User.objects.get(pk=token.user.pk)
success = verify_user_alias(user, token)
if success is False:
msg = _('Error validating user alias.')
raise serializers.ValidationError(msg)
attrs['user'] = user
return attrs
else:
msg = _('Invalid Token')
raise serializers.ValidationError(msg)
else:
msg = _('Missing authentication token.')
raise serializers.ValidationError(msg)
class CallbackTokenVerificationSerializer(AbstractBaseCallbackTokenSerializer):
"""
Takes a user and a token, verifies the token belongs to the user and
validates the alias that the token was sent from.
"""
def validate(self, attrs):
try:
user_id = self.context.get("user_id")
callback_token = attrs.get('token', None)
token = CallbackToken.objects.get(key=callback_token, is_active=True)
user = User.objects.get(pk=user_id)
if token.user == user:
# Check that the token.user is the request.user
# Mark this alias as verified
success = verify_user_alias(user, token)
if success is False:
logger.debug("djangopasswordlessknox: Error verifying alias.")
attrs['user'] = user
return attrs
else:
msg = _('This token is invalid. Try again later.')
logger.debug("djangopasswordlessknox: User token mismatch when verifying alias.")
except CallbackToken.DoesNotExist:
msg = _('Missing authentication token.')
logger.debug("djangopasswordlessknox: Tried to validate alias with bad token.")
pass
except User.DoesNotExist:
msg = _('Missing user.')
logger.debug("djangopasswordlessknox: Tried to validate alias with bad user.")
pass
except PermissionDenied:
msg = _('Insufficient permissions.')
logger.debug("djangopasswordlessknox: Permission denied while validating alias.")
pass
raise serializers.ValidationError(msg)