Django Restful Framework 验证JWT token唯一性
Contents
什么是JWT
JWT 全称 JSON Web Token 具体可以看
https://www.ruanyifeng.com/blog/2018/07/json_web_token-tutorial.html
为什么要JWT唯一性
因为JWT 认证不像session 认证有数据库记录一样, JWT 是靠token 来进行认证的,如果一个系统不希望一个账号多处登陆,就要对用户的token唯一性做认证
实现方法
重写 rest_framework_jwt的认证方法并加入缓存。
用户登陆时将token储存在redis中, 登陆时替换掉原有的token。
重写登陆认证
class HotNestLoginSerializer(Serializer):
def __init__(self, *args, **kwargs):
"""
登陆验证
"""
super(HotNestLoginSerializer, self).__init__(*args, **kwargs)
self.fields[self.username_field] = serializers.CharField()
self.fields['password'] = PasswordField(write_only=True)
@property
def username_field(self):
return get_username_field()
def validate(self, attrs):
credentials = {
self.username_field: attrs.get(self.username_field),
'password': attrs.get('password')
}
has_reg = User.objects.filter(
Q(username=attrs.get(self.username_field)) | Q(email=attrs.get(self.username_field))).first()
if not has_reg:
raise serializers.ValidationError(detail=NO_REGISTER)
if all(credentials.values()):
user = authenticate(**credentials)
if user:
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
payload = jwt_payload_handler(user)
token = jwt_encode_handler(payload)
cache.set(user.email + settings.REDIS_TOKEN_SUFFIX, token, timeout=36800 * 7)
return {
'token': token,
'user': user
}
else:
raise serializers.ValidationError(detail=ACCOUNT_PASSWORD_ERROR)
else:
msg = _('Must include "{username_field}" and "password".')
msg = msg.format(username_field=self.username_field)
raise serializers.ValidationError(msg)
重写 token verify 接口
class HotNestJSONWebTokenAuthentication(BaseJSONWebTokenAuthentication):
def get_jwt_value(self, request):
auth = get_authorization_header(request).split()
auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
if not auth:
if api_settings.JWT_AUTH_COOKIE:
return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE)
return None
if smart_text(auth[0].lower()) != auth_header_prefix:
return None
if len(auth) == 1:
msg = _('Invalid Authorization header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid Authorization header. Credentials string '
'should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
info = jwt_decode_handler(auth[1])
except Exception as e:
raise serializers.ValidationError(detail=TOKEN_EXPIRED)
cache_token = cache.get(info['email'] + settings.REDIS_TOKEN_SUFFIX)
if cache_token ==auth[1].decode("utf-8"):
return auth[1]
else:
raise serializers.ValidationError(detail=TOKEN_EXPIRED)