mirror of
https://github.com/i701/sarlink-portal-api.git
synced 2025-02-21 18:12:00 +00:00
258 lines
8.8 KiB
Python
258 lines
8.8 KiB
Python
import logging
|
|
from django.utils.translation import gettext_lazy as _
|
|
from django.contrib.auth import get_user_model
|
|
from django.core.exceptions import PermissionDenied
|
|
from django.core.validators import RegexValidator
|
|
from rest_framework import serializers
|
|
from djangopasswordlessknox.models import CallbackToken
|
|
from djangopasswordlessknox.settings import api_settings
|
|
from djangopasswordlessknox.utils import authenticate_by_token, verify_user_alias, validate_token_age
|
|
|
|
logger = logging.getLogger(__name__)
|
|
User = get_user_model()
|
|
|
|
|
|
class TokenField(serializers.CharField):
|
|
default_error_messages = {
|
|
'required': _('Invalid Token'),
|
|
'invalid': _('Invalid Token'),
|
|
'blank': _('Invalid Token'),
|
|
'max_length': _('Tokens are {max_length} digits long.'),
|
|
'min_length': _('Tokens are {min_length} digits long.')
|
|
}
|
|
|
|
|
|
class AbstractBaseAliasAuthenticationSerializer(serializers.Serializer):
|
|
"""
|
|
Abstract class that returns a callback token based on the field given
|
|
Returns a token if valid, None or a message if not.
|
|
"""
|
|
|
|
@property
|
|
def alias_type(self):
|
|
# The alias type, either email or mobile
|
|
raise NotImplementedError
|
|
|
|
def validate(self, attrs):
|
|
alias = attrs.get(self.alias_type)
|
|
print("ALIAS", alias)
|
|
print("ALIAS TYPE", self.alias_type)
|
|
if alias:
|
|
# Create or authenticate a user
|
|
# Return THem
|
|
if api_settings.PASSWORDLESS_REGISTER_NEW_USERS is True:
|
|
# If new aliases should register new users.
|
|
try:
|
|
user = User.objects.get(**{self.alias_type: alias})
|
|
user_created = False
|
|
except User.DoesNotExist:
|
|
# If no user is found, raise an error
|
|
msg = ""
|
|
if self.alias_type == 'email':
|
|
msg = _('No user found with this email.')
|
|
elif self.alias_type == 'mobile':
|
|
msg = _('No user found with this mobile number.')
|
|
raise serializers.ValidationError(msg)
|
|
else:
|
|
# If new aliases should not register new users.
|
|
try:
|
|
user = User.objects.get(**{self.alias_type: alias})
|
|
except User.DoesNotExist:
|
|
user = None
|
|
|
|
if user:
|
|
if not user.is_active:
|
|
# If valid, return attrs so we can create a token in our logic controller
|
|
msg = _('User account is disabled.')
|
|
raise serializers.ValidationError(msg)
|
|
else:
|
|
msg = _('No account is associated with this alias.')
|
|
raise serializers.ValidationError(msg)
|
|
else:
|
|
msg = _('Missing %s.') % self.alias_type
|
|
raise serializers.ValidationError(msg)
|
|
|
|
attrs['user'] = user
|
|
return attrs
|
|
|
|
|
|
class EmailAuthSerializer(AbstractBaseAliasAuthenticationSerializer):
|
|
@property
|
|
def alias_type(self):
|
|
return 'email'
|
|
|
|
email = serializers.EmailField()
|
|
|
|
|
|
class MobileAuthSerializer(AbstractBaseAliasAuthenticationSerializer):
|
|
@property
|
|
def alias_type(self):
|
|
return 'mobile'
|
|
|
|
phone_regex = RegexValidator(regex=r'^[7|9][0-9]{6}$',
|
|
message="Mobile number must be entered in the format:"
|
|
" '7xxxxxx' or '9xxxxxx'.")
|
|
mobile = serializers.CharField(validators=[phone_regex], max_length=15)
|
|
|
|
|
|
"""
|
|
Verification
|
|
"""
|
|
|
|
|
|
class AbstractBaseAliasVerificationSerializer(serializers.Serializer):
|
|
"""
|
|
Abstract class that returns a callback token based on the field given
|
|
Returns a token if valid, None or a message if not.
|
|
"""
|
|
@property
|
|
def alias_type(self):
|
|
# The alias type, either email or mobile
|
|
raise NotImplementedError
|
|
|
|
def validate(self, attrs):
|
|
|
|
msg = _('There was a problem with your request.')
|
|
|
|
if self.alias_type:
|
|
# Get request.user
|
|
# Get their specified valid endpoint
|
|
# Validate
|
|
|
|
request = self.context["request"]
|
|
if request and hasattr(request, "user"):
|
|
user = request.user
|
|
if user:
|
|
if not user.is_active:
|
|
# If valid, return attrs so we can create a token in our logic controller
|
|
msg = _('User account is disabled.')
|
|
|
|
else:
|
|
if hasattr(user, self.alias_type):
|
|
# Has the appropriate alias type
|
|
attrs['user'] = user
|
|
return attrs
|
|
else:
|
|
msg = _('This user doesn\'t have an %s.' % self.alias_type)
|
|
raise serializers.ValidationError(msg)
|
|
else:
|
|
msg = _('Missing %s.') % self.alias_type
|
|
raise serializers.ValidationError(msg)
|
|
|
|
|
|
class EmailVerificationSerializer(AbstractBaseAliasVerificationSerializer):
|
|
@property
|
|
def alias_type(self):
|
|
return 'email'
|
|
|
|
|
|
class MobileVerificationSerializer(AbstractBaseAliasVerificationSerializer):
|
|
@property
|
|
def alias_type(self):
|
|
return 'mobile'
|
|
|
|
|
|
"""
|
|
Callback Token
|
|
"""
|
|
|
|
|
|
def token_age_validator(value):
|
|
"""
|
|
Check token age
|
|
Makes sure a token is within the proper expiration datetime window.
|
|
"""
|
|
valid_token = validate_token_age(value)
|
|
if not valid_token:
|
|
raise serializers.ValidationError("The token you entered isn't valid.")
|
|
return value
|
|
|
|
|
|
class AbstractBaseCallbackTokenSerializer(serializers.Serializer):
|
|
"""
|
|
Abstract class inspired by DRF's own token serializer.
|
|
Returns a user if valid, None or a message if not.
|
|
"""
|
|
token = TokenField(min_length=6, max_length=6, validators=[token_age_validator])
|
|
|
|
|
|
class CallbackTokenAuthSerializer(AbstractBaseCallbackTokenSerializer):
|
|
|
|
def validate(self, attrs):
|
|
callback_token = attrs.get('token', None)
|
|
|
|
token = CallbackToken.objects.get(key=callback_token, is_active=True)
|
|
|
|
if token:
|
|
# Check the token type for our uni-auth method.
|
|
# authenticates and checks the expiry of the callback token.
|
|
user = authenticate_by_token(token)
|
|
if user:
|
|
if not user.is_active:
|
|
msg = _('User account is disabled.')
|
|
raise serializers.ValidationError(msg)
|
|
|
|
if api_settings.PASSWORDLESS_USER_MARK_EMAIL_VERIFIED \
|
|
or api_settings.PASSWORDLESS_USER_MARK_MOBILE_VERIFIED:
|
|
# Mark this alias as verified
|
|
user = User.objects.get(pk=token.user.pk)
|
|
success = verify_user_alias(user, token)
|
|
|
|
if success is False:
|
|
msg = _('Error validating user alias.')
|
|
raise serializers.ValidationError(msg)
|
|
|
|
attrs['user'] = user
|
|
return attrs
|
|
|
|
else:
|
|
msg = _('Invalid Token')
|
|
raise serializers.ValidationError(msg)
|
|
else:
|
|
msg = _('Missing authentication token.')
|
|
raise serializers.ValidationError(msg)
|
|
|
|
|
|
class CallbackTokenVerificationSerializer(AbstractBaseCallbackTokenSerializer):
|
|
"""
|
|
Takes a user and a token, verifies the token belongs to the user and
|
|
validates the alias that the token was sent from.
|
|
"""
|
|
|
|
def validate(self, attrs):
|
|
try:
|
|
user_id = self.context.get("user_id")
|
|
callback_token = attrs.get('token', None)
|
|
|
|
token = CallbackToken.objects.get(key=callback_token, is_active=True)
|
|
user = User.objects.get(pk=user_id)
|
|
|
|
if token.user == user:
|
|
# Check that the token.user is the request.user
|
|
|
|
# Mark this alias as verified
|
|
success = verify_user_alias(user, token)
|
|
if success is False:
|
|
logger.debug("djangopasswordlessknox: Error verifying alias.")
|
|
|
|
attrs['user'] = user
|
|
return attrs
|
|
else:
|
|
msg = _('This token is invalid. Try again later.')
|
|
logger.debug("djangopasswordlessknox: User token mismatch when verifying alias.")
|
|
|
|
except CallbackToken.DoesNotExist:
|
|
msg = _('Missing authentication token.')
|
|
logger.debug("djangopasswordlessknox: Tried to validate alias with bad token.")
|
|
pass
|
|
except User.DoesNotExist:
|
|
msg = _('Missing user.')
|
|
logger.debug("djangopasswordlessknox: Tried to validate alias with bad user.")
|
|
pass
|
|
except PermissionDenied:
|
|
msg = _('Insufficient permissions.')
|
|
logger.debug("djangopasswordlessknox: Permission denied while validating alias.")
|
|
pass
|
|
|
|
raise serializers.ValidationError(msg)
|