Initial commit

This commit is contained in:
2025-01-20 14:33:03 +05:00
commit 4d0eb86478
84 changed files with 4436 additions and 0 deletions

View File

@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
__title__ = 'djangopasswordlessknox'
__version__ = '1.4.0'
__author__ = 'Lijo'
__license__ = 'MIT'
__copyright__ = 'Copyright 2019 lijo'
# Version synonym
VERSION = __version__
default_app_config = 'djangopasswordlessknox.apps.DrfpasswordlessConfig'

View File

@ -0,0 +1,3 @@
VERSION = (1, 4, 0)
__version__ = '.'.join(map(str, VERSION))

View File

@ -0,0 +1,34 @@
from django.contrib import admin
from django.urls import reverse
from djangopasswordlessknox.models import CallbackToken
class UserLinkMixin(object):
"""
A mixin to add a linkable list_display user field.
"""
LINK_TO_USER_FIELD = 'link_to_user'
def link_to_user(self, obj):
link = reverse('admin:users_user_change', args=[obj.user.id])
return u'<a href={}>{}</a>'.format(link, obj.user.username)
link_to_user.allow_tags = True
link_to_user.short_description = 'User'
class AbstractCallbackTokenInline(admin.StackedInline):
max_num = 0
extra = 0
readonly_fields = ('created_at', 'key', 'is_active')
fields = ('created_at', 'user', 'key', 'is_active')
class CallbackInline(AbstractCallbackTokenInline):
model = CallbackToken
class AbstractCallbackTokenAdmin(UserLinkMixin, admin.ModelAdmin):
readonly_fields = ('created_at', 'user', 'key')
list_display = ('created_at', UserLinkMixin.LINK_TO_USER_FIELD, 'key', 'is_active')
fields = ('created_at', 'user', 'key', 'is_active')
extra = 0

View File

@ -0,0 +1,9 @@
from django.apps import AppConfig
from django.utils.translation import gettext_lazy as _
class DrfpasswordlessConfig(AppConfig):
name = 'djangopasswordlessknox'
verbose = _("DRF Passwordless")
def ready(self):
import djangopasswordlessknox.signals

View File

@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.10.6 on 2017-04-05 03:27
from __future__ import unicode_literals
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import djangopasswordlessknox.models
import uuid
class Migration(migrations.Migration):
initial = True
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='CallbackToken',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False, unique=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('is_active', models.BooleanField(default=True)),
('to_alias', models.CharField(blank=True, max_length=40)),
('to_alias_type', models.CharField(blank=True, max_length=20)),
('key', models.CharField(default=djangopasswordlessknox.models.generate_numeric_token, max_length=6, unique=True)),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
],
options={
'verbose_name': 'Callback Token',
'abstract': False,
'ordering': ['-id'],
'get_latest_by': 'created_at',
},
),
migrations.AlterUniqueTogether(
name='callbacktoken',
unique_together=set([('key', 'is_active')]),
),
]

View File

@ -0,0 +1,62 @@
import uuid
from random import randint
from django.db import models
from django.conf import settings
def generate_hex_token():
return uuid.uuid1().hex
def generate_numeric_token():
"""
Generate a random 6 digit string of numbers.
We use this formatting to allow leading 0s.
"""
return str("%06d" % randint(0, 999999))
class CallbackTokenManger(models.Manager):
def active(self):
return self.get_queryset().filter(is_active=True)
def inactive(self):
return self.get_queryset().filter(is_active=False)
class AbstractBaseCallbackToken(models.Model):
"""
Callback Authentication Tokens
These tokens present a client with their authorization token
on successful exchange of a random token (email) or token (for mobile)
When a new token is created, older ones of the same type are invalidated
via the pre_save signal in signals.py.
"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False, unique=True)
created_at = models.DateTimeField(auto_now_add=True)
user = models.ForeignKey(settings.AUTH_USER_MODEL, related_name=None, on_delete=models.CASCADE)
is_active = models.BooleanField(default=True)
to_alias = models.CharField(blank=True, max_length=40)
to_alias_type = models.CharField(blank=True, max_length=20)
objects = CallbackTokenManger()
class Meta:
abstract = True
get_latest_by = 'created_at'
ordering = ['-id']
unique_together = (('key', 'is_active'),)
def __str__(self):
return str(self.key)
class CallbackToken(AbstractBaseCallbackToken):
"""
Generates a random six digit number to be returned.
"""
key = models.CharField(default=generate_numeric_token, max_length=6, unique=True)
class Meta(AbstractBaseCallbackToken.Meta):
verbose_name = 'Callback Token'

View File

@ -0,0 +1,257 @@
import logging
from django.utils.translation import gettext_lazy as _
from django.contrib.auth import get_user_model
from django.core.exceptions import PermissionDenied
from django.core.validators import RegexValidator
from rest_framework import serializers
from djangopasswordlessknox.models import CallbackToken
from djangopasswordlessknox.settings import api_settings
from djangopasswordlessknox.utils import authenticate_by_token, verify_user_alias, validate_token_age
logger = logging.getLogger(__name__)
User = get_user_model()
class TokenField(serializers.CharField):
default_error_messages = {
'required': _('Invalid Token'),
'invalid': _('Invalid Token'),
'blank': _('Invalid Token'),
'max_length': _('Tokens are {max_length} digits long.'),
'min_length': _('Tokens are {min_length} digits long.')
}
class AbstractBaseAliasAuthenticationSerializer(serializers.Serializer):
"""
Abstract class that returns a callback token based on the field given
Returns a token if valid, None or a message if not.
"""
@property
def alias_type(self):
# The alias type, either email or mobile
raise NotImplementedError
def validate(self, attrs):
alias = attrs.get(self.alias_type)
print("ALIAS", alias)
print("ALIAS TYPE", self.alias_type)
if alias:
# Create or authenticate a user
# Return THem
if api_settings.PASSWORDLESS_REGISTER_NEW_USERS is True:
# If new aliases should register new users.
try:
user = User.objects.get(**{self.alias_type: alias})
user_created = False
except User.DoesNotExist:
# If no user is found, raise an error
msg = ""
if self.alias_type == 'email':
msg = _('No user found with this email.')
elif self.alias_type == 'mobile':
msg = _('No user found with this mobile number.')
raise serializers.ValidationError(msg)
else:
# If new aliases should not register new users.
try:
user = User.objects.get(**{self.alias_type: alias})
except User.DoesNotExist:
user = None
if user:
if not user.is_active:
# If valid, return attrs so we can create a token in our logic controller
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
else:
msg = _('No account is associated with this alias.')
raise serializers.ValidationError(msg)
else:
msg = _('Missing %s.') % self.alias_type
raise serializers.ValidationError(msg)
attrs['user'] = user
return attrs
class EmailAuthSerializer(AbstractBaseAliasAuthenticationSerializer):
@property
def alias_type(self):
return 'email'
email = serializers.EmailField()
class MobileAuthSerializer(AbstractBaseAliasAuthenticationSerializer):
@property
def alias_type(self):
return 'mobile'
phone_regex = RegexValidator(regex=r'^[7|9][0-9]{6}$',
message="Mobile number must be entered in the format:"
" '7xxxxxx' or '9xxxxxx'.")
mobile = serializers.CharField(validators=[phone_regex], max_length=15)
"""
Verification
"""
class AbstractBaseAliasVerificationSerializer(serializers.Serializer):
"""
Abstract class that returns a callback token based on the field given
Returns a token if valid, None or a message if not.
"""
@property
def alias_type(self):
# The alias type, either email or mobile
raise NotImplementedError
def validate(self, attrs):
msg = _('There was a problem with your request.')
if self.alias_type:
# Get request.user
# Get their specified valid endpoint
# Validate
request = self.context["request"]
if request and hasattr(request, "user"):
user = request.user
if user:
if not user.is_active:
# If valid, return attrs so we can create a token in our logic controller
msg = _('User account is disabled.')
else:
if hasattr(user, self.alias_type):
# Has the appropriate alias type
attrs['user'] = user
return attrs
else:
msg = _('This user doesn\'t have an %s.' % self.alias_type)
raise serializers.ValidationError(msg)
else:
msg = _('Missing %s.') % self.alias_type
raise serializers.ValidationError(msg)
class EmailVerificationSerializer(AbstractBaseAliasVerificationSerializer):
@property
def alias_type(self):
return 'email'
class MobileVerificationSerializer(AbstractBaseAliasVerificationSerializer):
@property
def alias_type(self):
return 'mobile'
"""
Callback Token
"""
def token_age_validator(value):
"""
Check token age
Makes sure a token is within the proper expiration datetime window.
"""
valid_token = validate_token_age(value)
if not valid_token:
raise serializers.ValidationError("The token you entered isn't valid.")
return value
class AbstractBaseCallbackTokenSerializer(serializers.Serializer):
"""
Abstract class inspired by DRF's own token serializer.
Returns a user if valid, None or a message if not.
"""
token = TokenField(min_length=6, max_length=6, validators=[token_age_validator])
class CallbackTokenAuthSerializer(AbstractBaseCallbackTokenSerializer):
def validate(self, attrs):
callback_token = attrs.get('token', None)
token = CallbackToken.objects.get(key=callback_token, is_active=True)
if token:
# Check the token type for our uni-auth method.
# authenticates and checks the expiry of the callback token.
user = authenticate_by_token(token)
if user:
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
if api_settings.PASSWORDLESS_USER_MARK_EMAIL_VERIFIED \
or api_settings.PASSWORDLESS_USER_MARK_MOBILE_VERIFIED:
# Mark this alias as verified
user = User.objects.get(pk=token.user.pk)
success = verify_user_alias(user, token)
if success is False:
msg = _('Error validating user alias.')
raise serializers.ValidationError(msg)
attrs['user'] = user
return attrs
else:
msg = _('Invalid Token')
raise serializers.ValidationError(msg)
else:
msg = _('Missing authentication token.')
raise serializers.ValidationError(msg)
class CallbackTokenVerificationSerializer(AbstractBaseCallbackTokenSerializer):
"""
Takes a user and a token, verifies the token belongs to the user and
validates the alias that the token was sent from.
"""
def validate(self, attrs):
try:
user_id = self.context.get("user_id")
callback_token = attrs.get('token', None)
token = CallbackToken.objects.get(key=callback_token, is_active=True)
user = User.objects.get(pk=user_id)
if token.user == user:
# Check that the token.user is the request.user
# Mark this alias as verified
success = verify_user_alias(user, token)
if success is False:
logger.debug("djangopasswordlessknox: Error verifying alias.")
attrs['user'] = user
return attrs
else:
msg = _('This token is invalid. Try again later.')
logger.debug("djangopasswordlessknox: User token mismatch when verifying alias.")
except CallbackToken.DoesNotExist:
msg = _('Missing authentication token.')
logger.debug("djangopasswordlessknox: Tried to validate alias with bad token.")
pass
except User.DoesNotExist:
msg = _('Missing user.')
logger.debug("djangopasswordlessknox: Tried to validate alias with bad user.")
pass
except PermissionDenied:
msg = _('Insufficient permissions.')
logger.debug("djangopasswordlessknox: Permission denied while validating alias.")
pass
raise serializers.ValidationError(msg)

View File

@ -0,0 +1,23 @@
from djangopasswordlessknox.utils import (
create_callback_token_for_user,
send_email_with_callback_token,
send_sms_with_callback_token
)
class TokenService(object):
@staticmethod
def send_token(user, alias_type, **message_payload):
token = create_callback_token_for_user(user, alias_type)
send_action = None
if alias_type == 'email':
send_action = send_email_with_callback_token
elif alias_type == 'mobile':
send_action = send_sms_with_callback_token
if send_action is None:
raise ValueError(f"Invalid alias_type: {alias_type}")
# Send to alias
success = send_action(user, token, **message_payload)
return success

View File

@ -0,0 +1,79 @@
from django.conf import settings
from rest_framework.settings import APISettings
USER_SETTINGS = getattr(settings, 'PASSWORDLESS_AUTH', None)
DEFAULTS = {
# Allowed auth types, can be EMAIL, MOBILE, or both.
'PASSWORDLESS_AUTH_TYPES': ['EMAIL'],
# Amount of time that tokens last, in seconds
'PASSWORDLESS_TOKEN_EXPIRE_TIME': 15 * 60,
# The user's email field name
'PASSWORDLESS_USER_EMAIL_FIELD_NAME': 'email',
# The user's mobile field name
'PASSWORDLESS_USER_MOBILE_FIELD_NAME': 'mobile',
# Marks itself as verified the first time a user completes auth via token.
# Automatically unmarks itself if email is changed.
'PASSWORDLESS_USER_MARK_EMAIL_VERIFIED': False,
'PASSWORDLESS_USER_EMAIL_VERIFIED_FIELD_NAME': 'email_verified',
# Marks itself as verified the first time a user completes auth via token.
# Automatically unmarks itself if mobile number is changed.
'PASSWORDLESS_USER_MARK_MOBILE_VERIFIED': False,
'PASSWORDLESS_USER_MOBILE_VERIFIED_FIELD_NAME': 'mobile_verified',
# The email the callback token is sent from
'PASSWORDLESS_EMAIL_NOREPLY_ADDRESS': None,
# The email subject
'PASSWORDLESS_EMAIL_SUBJECT': "Your Login Token",
# A plaintext email message overridden by the html message. Takes one string.
'PASSWORDLESS_EMAIL_PLAINTEXT_MESSAGE': "Enter this token to sign in: %s",
# The email template name.
'PASSWORDLESS_EMAIL_TOKEN_HTML_TEMPLATE_NAME': "passwordless_default_token_email.html",
# Your twilio number that sends the callback tokens.
'PASSWORDLESS_MOBILE_NOREPLY_NUMBER': None,
# The message sent to mobile users logging in. Takes one string.
'PASSWORDLESS_MOBILE_MESSAGE': "Use this code to log in: %s",
# Registers previously unseen aliases as new users.
'PASSWORDLESS_REGISTER_NEW_USERS': False,
# Suppresses actual SMS for testing
'PASSWORDLESS_TEST_SUPPRESSION': False,
# Context Processors for Email Template
'PASSWORDLESS_CONTEXT_PROCESSORS': [],
# The verification email subject
'PASSWORDLESS_EMAIL_VERIFICATION_SUBJECT': "Your Verification Token",
# A plaintext verification email message overridden by the html message. Takes one string.
'PASSWORDLESS_EMAIL_VERIFICATION_PLAINTEXT_MESSAGE': "Enter this verification code: %s",
# The verification email template name.
'PASSWORDLESS_EMAIL_VERIFICATION_TOKEN_HTML_TEMPLATE_NAME': "passwordless_default_verification_token_email.html",
# The message sent to mobile users logging in. Takes one string.
'PASSWORDLESS_MOBILE_VERIFICATION_MESSAGE': "Enter this verification code: %s",
# Automatically send verification email or sms when a user changes their alias.
'PASSWORDLESS_AUTO_SEND_VERIFICATION_TOKEN': False,
}
# List of settings that may be in string import notation.
IMPORT_STRINGS = (
'PASSWORDLESS_EMAIL_TOKEN_HTML_TEMPLATE',
'PASSWORDLESS_CONTEXT_PROCESSORS',
)
api_settings = APISettings(USER_SETTINGS, DEFAULTS, IMPORT_STRINGS) #type: ignore

View File

@ -0,0 +1,118 @@
import logging
from django.contrib.auth import get_user_model
from django.dispatch import receiver
from django.db.models import signals
from djangopasswordlessknox.models import CallbackToken
from djangopasswordlessknox.models import generate_numeric_token
from djangopasswordlessknox.settings import api_settings
from djangopasswordlessknox.services import TokenService
logger = logging.getLogger(__name__)
@receiver(signals.pre_save, sender=CallbackToken)
def invalidate_previous_tokens(sender, instance, **kwargs):
"""
Invalidates all previously issued tokens as a post_save signal.
"""
active_tokens = None
if isinstance(instance, CallbackToken):
active_tokens = CallbackToken.objects.active().filter(user=instance.user).exclude(id=instance.id)
# Invalidate tokens
if active_tokens:
for token in active_tokens:
token.is_active = False
token.save()
@receiver(signals.pre_save, sender=CallbackToken)
def check_unique_tokens(sender, instance, **kwargs):
"""
Ensures that mobile and email tokens are unique or tries once more to generate.
"""
if isinstance(instance, CallbackToken):
if CallbackToken.objects.filter(key=instance.key, is_active=True).exists():
instance.key = generate_numeric_token()
User = get_user_model()
@receiver(signals.pre_save, sender=User)
def update_alias_verification(sender, instance, **kwargs):
"""
Flags a user's email as unverified if they change it.
Optionally sends a verification token to the new endpoint.
"""
if isinstance(instance, User):
if instance.id:
if api_settings.PASSWORDLESS_USER_MARK_EMAIL_VERIFIED is True:
"""
For marking email aliases as not verified when a user changes it.
"""
email_field = api_settings.PASSWORDLESS_USER_EMAIL_FIELD_NAME
email_verified_field = api_settings.PASSWORDLESS_USER_EMAIL_VERIFIED_FIELD_NAME
# Verify that this is an existing instance and not a new one.
try:
user_old = User.objects.get(id=instance.id) # Pre-save object
instance_email = getattr(instance, email_field) # Incoming Email
old_email = getattr(user_old, email_field) # Pre-save object email
if instance_email != old_email and instance_email != "" and instance_email is not None:
# Email changed, verification should be flagged
setattr(instance, email_verified_field, False)
if api_settings.PASSWORDLESS_AUTO_SEND_VERIFICATION_TOKEN is True:
email_subject = api_settings.PASSWORDLESS_EMAIL_VERIFICATION_SUBJECT
email_plaintext = api_settings.PASSWORDLESS_EMAIL_VERIFICATION_PLAINTEXT_MESSAGE
email_html = api_settings.PASSWORDLESS_EMAIL_VERIFICATION_TOKEN_HTML_TEMPLATE_NAME
message_payload = {'email_subject': email_subject,
'email_plaintext': email_plaintext,
'email_html': email_html}
success = TokenService.send_token(instance, 'email', **message_payload)
if success:
logger.info('djangopasswordlessknox: Successfully sent email on updated address: %s'
% instance_email)
else:
logger.info('djangopasswordlessknox: Failed to send email to updated address: %s'
% instance_email)
except User.DoesNotExist:
# User probably is just initially being created
setattr(instance, email_verified_field, True)
if api_settings.PASSWORDLESS_USER_MARK_MOBILE_VERIFIED is True:
"""
For marking mobile aliases as not verified when a user changes it.
"""
mobile_field = api_settings.PASSWORDLESS_USER_MOBILE_FIELD_NAME
mobile_verified_field = api_settings.PASSWORDLESS_USER_MOBILE_VERIFIED_FIELD_NAME
# Verify that this is an existing instance and not a new one.
try:
user_old = User.objects.get(id=instance.id) # Pre-save object
instance_mobile = getattr(instance, mobile_field) # Incoming mobile
old_mobile = getattr(user_old, mobile_field) # Pre-save object mobile
if instance_mobile != old_mobile and instance_mobile != "" and instance_mobile is not None:
# Mobile changed, verification should be flagged
setattr(instance, mobile_verified_field, False)
if api_settings.PASSWORDLESS_AUTO_SEND_VERIFICATION_TOKEN is True:
mobile_message = api_settings.PASSWORDLESS_MOBILE_MESSAGE
message_payload = {'mobile_message': mobile_message}
success = TokenService.send_token(instance, 'mobile', **message_payload)
if success:
logger.info('djangopasswordlessknox: Successfully sent SMS on updated mobile: %s'
% instance_mobile)
else:
logger.info('djangopasswordlessknox: Failed to send SMS to updated mobile: %s'
% instance_mobile)
except User.DoesNotExist:
# User probably is just initially being created
setattr(instance, mobile_verified_field, True)

View File

@ -0,0 +1,88 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>Your Login Token</title>
<style>
body {
font-family: Arial, sans-serif;
background-color: #f5f5f5;
margin: 0;
padding: 20px;
}
.container {
max-width: 600px;
margin: 0 auto;
background-color: #ffffff;
border-radius: 8px;
padding: 30px;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
}
.header {
text-align: center;
margin-bottom: 30px;
}
.logo {
color: #2c3e50;
font-size: 24px;
font-weight: bold;
}
.token-container {
background-color: #f8f9fa;
border: 2px solid #e9ecef;
border-radius: 6px;
padding: 20px;
text-align: center;
margin: 20px 0;
}
.token {
color: #007bff;
font-size: 32px;
font-weight: bold;
letter-spacing: 2px;
}
.message {
color: #6c757d;
font-size: 16px;
line-height: 1.5;
margin-top: 20px;
}
.footer {
margin-top: 30px;
text-align: center;
color: #6c757d;
font-size: 14px;
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<div class="logo">SARLink Portal</div>
</div>
<div class="token-container">
<div class="message">Your login code is:</div>
<div class="token">{{ callback_token }}</div>
</div>
<div class="message">
Please use this code to complete your login. This code will
expire in 15 minutes for security purposes.
</div>
<div class="footer">
If you didn't request this code, please ignore this email.
<br />
© 2024 Council Portal. All rights reserved.
</div>
</div>
</body>
</html>

View File

@ -0,0 +1,10 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>Your Verification Token</title>
</head>
<body>
<h2>Use this verification code: {{ callback_token }}</h2>
</body>
</html>

View File

@ -0,0 +1,18 @@
from django.urls import path
from djangopasswordlessknox.views import (
ObtainEmailCallbackToken,
ObtainMobileCallbackToken,
ObtainAuthTokenFromCallbackToken,
VerifyAliasFromCallbackToken,
ObtainEmailVerificationCallbackToken,
ObtainMobileVerificationCallbackToken,
)
urlpatterns = [
path('callback/auth/', ObtainAuthTokenFromCallbackToken.as_view(), name='auth_callback'),
path('auth/email/', ObtainEmailCallbackToken.as_view(), name='auth_email'),
path('auth/mobile/', ObtainMobileCallbackToken.as_view(), name='auth_mobile'),
path('callback/verify/', VerifyAliasFromCallbackToken.as_view(), name='verify_callback'),
path('verify/email/', ObtainEmailVerificationCallbackToken.as_view(), name='verify_email'),
path('verify/mobile/', ObtainMobileVerificationCallbackToken.as_view(), name='verify_mobile'),
]

View File

@ -0,0 +1,211 @@
import logging
# import os
from django.contrib.auth import get_user_model
from django.core.exceptions import PermissionDenied
from django.core.mail import send_mail
from django.template import loader
from django.utils import timezone
from djangopasswordlessknox.models import CallbackToken
from djangopasswordlessknox.settings import api_settings
# from twilio.rest import Client
from decouple import config
import requests
import json
logger = logging.getLogger(__name__)
User = get_user_model()
def authenticate_by_token(callback_token):
try:
token = CallbackToken.objects.get(key=callback_token, is_active=True)
# Returning a user designates a successful authentication.
token.user = User.objects.get(pk=token.user.pk)
token.is_active = False # Mark this token as used.
token.save()
return token.user
except CallbackToken.DoesNotExist:
logger.debug("djangopasswordlessknox: Challenged with a callback token that doesn't exist.")
except User.DoesNotExist:
logger.debug("djangopasswordlessknox: Authenticated user somehow doesn't exist.")
except PermissionDenied:
logger.debug("djangopasswordlessknox: Permission denied while authenticating.")
return None
def create_callback_token_for_user(user, token_type):
token = None
token_type = token_type.upper()
if token_type == 'EMAIL':
token = CallbackToken.objects.create(user=user,
to_alias_type=token_type,
to_alias=getattr(user, api_settings.PASSWORDLESS_USER_EMAIL_FIELD_NAME))
elif token_type == 'MOBILE':
token = CallbackToken.objects.create(user=user,
to_alias_type=token_type,
to_alias=getattr(user, api_settings.PASSWORDLESS_USER_MOBILE_FIELD_NAME))
if token is not None:
return token
return None
def validate_token_age(callback_token):
"""
Returns True if a given token is within the age expiration limit.
"""
try:
token = CallbackToken.objects.get(key=callback_token, is_active=True)
seconds = (timezone.now() - token.created_at).total_seconds()
token_expiry_time = api_settings.PASSWORDLESS_TOKEN_EXPIRE_TIME
if seconds <= token_expiry_time:
return True
else:
# Invalidate our token.
token.is_active = False
token.save()
return False
except CallbackToken.DoesNotExist:
# No valid token.
return False
def verify_user_alias(user, token):
"""
Marks a user's contact point as verified depending on accepted token type.
"""
if token.to_alias_type == 'EMAIL':
if token.to_alias == getattr(user, api_settings.PASSWORDLESS_USER_EMAIL_FIELD_NAME):
setattr(user, api_settings.PASSWORDLESS_USER_EMAIL_VERIFIED_FIELD_NAME, True)
elif token.to_alias_type == 'MOBILE':
if token.to_alias == getattr(user, api_settings.PASSWORDLESS_USER_MOBILE_FIELD_NAME):
setattr(user, api_settings.PASSWORDLESS_USER_MOBILE_VERIFIED_FIELD_NAME, True)
else:
return False
user.save()
return True
def inject_template_context(context):
"""
Injects additional context into email template.
"""
for processor in api_settings.PASSWORDLESS_CONTEXT_PROCESSORS:
context.update(processor())
return context
def send_email_with_callback_token(user, email_token, **kwargs):
"""
Sends a Email to user.email.
Passes silently without sending in test environment
"""
try:
if api_settings.PASSWORDLESS_EMAIL_NOREPLY_ADDRESS:
# Make sure we have a sending address before sending.
# Get email subject and message
email_subject = kwargs.get('email_subject',
api_settings.PASSWORDLESS_EMAIL_SUBJECT)
email_plaintext = kwargs.get('email_plaintext',
api_settings.PASSWORDLESS_EMAIL_PLAINTEXT_MESSAGE)
email_html = kwargs.get('email_html',
api_settings.PASSWORDLESS_EMAIL_TOKEN_HTML_TEMPLATE_NAME)
# Inject context if user specifies.
context = inject_template_context({'callback_token': email_token.key, })
html_message = loader.render_to_string(email_html, context,)
send_mail(
email_subject,
email_plaintext % email_token.key,
api_settings.PASSWORDLESS_EMAIL_NOREPLY_ADDRESS,
[getattr(user, api_settings.PASSWORDLESS_USER_EMAIL_FIELD_NAME)],
fail_silently=False,
html_message=html_message,)
else:
logger.debug("Failed to send token email. Missing PASSWORDLESS_EMAIL_NOREPLY_ADDRESS.")
return False
return True
except Exception as e:
logger.debug("Failed to send token email to user: %d."
"Possibly no email on user object. Email entered was %s" %
(user.id, getattr(user, api_settings.PASSWORDLESS_USER_EMAIL_FIELD_NAME)))
logger.debug(e)
return False
def send_sms_with_callback_token(user, mobile_token, **kwargs):
"""
Sends a SMS to user.mobile via Twilio.
Passes silently without sending in test environment.
"""
base_string = kwargs.get('mobile_message', api_settings.PASSWORDLESS_MOBILE_MESSAGE)
try:
if api_settings.PASSWORDLESS_MOBILE_NOREPLY_NUMBER:
print("Sending SMS")
# We need a sending number to send properly
if api_settings.PASSWORDLESS_TEST_SUPPRESSION is True:
# we assume success to prevent spamming SMS during testing.
return True
to_number = getattr(user, api_settings.PASSWORDLESS_USER_MOBILE_FIELD_NAME)
if to_number.__class__.__name__ == 'PhoneNumber':
to_number = to_number.__str__()
# user_withh_mobile_exists = User.objects.filter(mobile=to_number).exists()
# if not user_withh_mobile_exists:
# print("User with mobile number does not exist.")
# logger.debug("User with mobile number does not exist.")
# return False
api_url = config("SMS_API_URL")
api_key = config("SMS_API_KEY")
if not api_url or not api_key:
logger.debug("Failed to send SMS. Missing SMS_API_URL or SMS_API_KEY.")
return False
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
data = {
"number": to_number,
"message": base_string % mobile_token.key,
"check_delivery": False
}
response = requests.post(api_url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
return True
else:
logger.debug(f"Failed to send SMS. Status code: {response.status_code}")
return False
else:
logger.debug("Failed to send token sms. Missing PASSWORDLESS_MOBILE_NOREPLY_NUMBER.")
return False
except ImportError:
logger.debug("Couldn't import Twilio client. Is twilio installed?")
return False
except KeyError:
logger.debug("Couldn't send SMS."
"Did you set your Twilio account tokens and specify a PASSWORDLESS_MOBILE_NOREPLY_NUMBER?")
except Exception as e:
logger.debug("Failed to send token SMS to user: {}. "
"Possibly no mobile number on user object or the twilio package isn't set up yet. "
"Number entered was {}".format(user.id, getattr(user, api_settings.PASSWORDLESS_USER_MOBILE_FIELD_NAME)))
logger.debug(e)
return False

View File

@ -0,0 +1,213 @@
import logging
from rest_framework import parsers, renderers, status
from rest_framework.authtoken.models import Token
from rest_framework.response import Response
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.views import APIView
from rest_framework import generics
from djangopasswordlessknox.settings import api_settings
from knox.models import AuthTokenManager, AuthToken
from djangopasswordlessknox.serializers import (
EmailAuthSerializer,
MobileAuthSerializer,
CallbackTokenAuthSerializer,
CallbackTokenVerificationSerializer,
EmailVerificationSerializer,
MobileVerificationSerializer,
)
from djangopasswordlessknox.services import TokenService
from djangopasswordlessknox.services import TokenService
from knox.settings import knox_settings
from rest_framework.serializers import DateTimeField
from django.utils import timezone
from django.contrib.auth.signals import user_logged_in, user_logged_out
logger = logging.getLogger(__name__)
class AbstractBaseObtainCallbackToken(APIView):
"""
This returns a 6-digit callback token we can trade for a user's Auth Token.
"""
success_response = "A login token has been sent to you."
failure_response = "Unable to send you a login code. Try again later."
message_payload = {}
@property
def serializer_class(self):
# Our serializer depending on type
raise NotImplementedError
@property
def alias_type(self):
# Alias Type
raise NotImplementedError
def post(self, request, *args, **kwargs):
if self.alias_type.upper() not in api_settings.PASSWORDLESS_AUTH_TYPES:
# Only allow auth types allowed in settings.
return Response(status=status.HTTP_404_NOT_FOUND)
serializer = self.serializer_class(data=request.data, context={'request': request})
if serializer.is_valid(raise_exception=True):
# Validate -
user = serializer.validated_data['user']
# Create and send callback token
success = TokenService.send_token(user, self.alias_type, **self.message_payload)
# Respond With Success Or Failure of Sent
if success:
status_code = status.HTTP_200_OK
response_detail = self.success_response
else:
status_code = status.HTTP_400_BAD_REQUEST
response_detail = self.failure_response
return Response({'detail': response_detail}, status=status_code)
else:
return Response(serializer.error_messages, status=status.HTTP_400_BAD_REQUEST)
class ObtainEmailCallbackToken(AbstractBaseObtainCallbackToken, generics.GenericAPIView):
permission_classes = (AllowAny,)
serializer_class = EmailAuthSerializer
success_response = "A login token has been sent to your email."
failure_response = "Unable to email you a login code. Try again later."
alias_type = 'email'
email_subject = api_settings.PASSWORDLESS_EMAIL_SUBJECT
email_plaintext = api_settings.PASSWORDLESS_EMAIL_PLAINTEXT_MESSAGE
email_html = api_settings.PASSWORDLESS_EMAIL_TOKEN_HTML_TEMPLATE_NAME
message_payload = {'email_subject': email_subject,
'email_plaintext': email_plaintext,
'email_html': email_html}
class ObtainMobileCallbackToken(AbstractBaseObtainCallbackToken, generics.GenericAPIView):
permission_classes = (AllowAny,)
serializer_class = MobileAuthSerializer
success_response = "We texted you a login code."
failure_response = "Unable to send you a login code. Try again later."
alias_type = 'mobile'
mobile_message = api_settings.PASSWORDLESS_MOBILE_MESSAGE
message_payload = {'mobile_message': mobile_message}
class ObtainEmailVerificationCallbackToken(AbstractBaseObtainCallbackToken, generics.GenericAPIView):
permission_classes = (IsAuthenticated,)
serializer_class = EmailVerificationSerializer
success_response = "A verification token has been sent to your email."
failure_response = "Unable to email you a verification code. Try again later."
alias_type = 'email'
email_subject = api_settings.PASSWORDLESS_EMAIL_VERIFICATION_SUBJECT
email_plaintext = api_settings.PASSWORDLESS_EMAIL_VERIFICATION_PLAINTEXT_MESSAGE
email_html = api_settings.PASSWORDLESS_EMAIL_VERIFICATION_TOKEN_HTML_TEMPLATE_NAME
message_payload = {
'email_subject': email_subject,
'email_plaintext': email_plaintext,
'email_html': email_html
}
class ObtainMobileVerificationCallbackToken(AbstractBaseObtainCallbackToken, generics.GenericAPIView):
permission_classes = (IsAuthenticated,)
serializer_class = MobileVerificationSerializer
success_response = "We texted you a verification code."
failure_response = "Unable to send you a verification code. Try again later."
alias_type = 'mobile'
mobile_message = api_settings.PASSWORDLESS_MOBILE_MESSAGE
message_payload = {'mobile_message': mobile_message}
class AbstractBaseObtainAuthToken(APIView):
"""
This is a duplicate of rest_framework's own ObtainAuthToken method.
Instead, this returns an Auth Token based on our 6 digit callback token and source.
"""
serializer_class = None
def get_context(self):
return {'request': self.request, 'format': self.format_kwarg, 'view': self}
def get_token_ttl(self):
return knox_settings.TOKEN_TTL
def get_token_limit_per_user(self):
return knox_settings.TOKEN_LIMIT_PER_USER
def get_user_serializer_class(self):
return knox_settings.USER_SERIALIZER
def get_expiry_datetime_format(self):
return knox_settings.EXPIRY_DATETIME_FORMAT
def format_expiry_datetime(self, expiry):
datetime_format = self.get_expiry_datetime_format()
return DateTimeField(format=datetime_format).to_representation(expiry)
def get_post_response_data(self, user, token, instance):
UserSerializer = self.get_user_serializer_class()
data = {
'expiry': self.format_expiry_datetime(instance.expiry),
'token': token
}
if UserSerializer is not None:
data["user"] = UserSerializer(
user,
context=self.get_context()
).data
return data
def post(self, request, format=None):
token_limit_per_user = self.get_token_limit_per_user()
serializer = self.serializer_class(data=request.data)
if serializer.is_valid(raise_exception=True):
user = serializer.validated_data['user']
if token_limit_per_user is not None:
now = timezone.now()
token = user.auth_token_set.filter(expiry__gt=now)
if token.count() >= token_limit_per_user:
return Response(
{"error": "Maximum amount of tokens allowed per user exceeded."},
status=status.HTTP_403_FORBIDDEN
)
token_ttl = self.get_token_ttl()
instance, token = AuthToken.objects.create(user, token_ttl)
user_logged_in.send(sender=user.__class__,
request=request, user=user)
data = self.get_post_response_data(user, token, instance)
return Response(data)
class ObtainAuthTokenFromCallbackToken(AbstractBaseObtainAuthToken, generics.GenericAPIView):
"""
This is a duplicate of rest_framework's own ObtainAuthToken method.
Instead, this returns an Auth Token based on our callback token and source.
"""
permission_classes = (AllowAny,)
serializer_class = CallbackTokenAuthSerializer
class VerifyAliasFromCallbackToken(APIView):
"""
This verifies an alias on correct callback token entry using the same logic as auth.
Should be refactored at some point.
"""
serializer_class = CallbackTokenVerificationSerializer
def post(self, request, *args, **kwargs):
serializer = self.serializer_class(data=request.data, context={'user_id': self.request.user.id})
if serializer.is_valid(raise_exception=True):
return Response({'detail': 'Alias verified.'}, status=status.HTTP_200_OK)
else:
logger.error("Couldn't verify unknown user. Errors on serializer: {}".format(serializer.error_messages))
return Response({'detail': 'We couldn\'t verify this alias. Try again later.'}, status.HTTP_400_BAD_REQUEST)