Contents

Django Restful Framework 验证JWT token唯一性

什么是JWT

JWT 全称 JSON Web Token 具体可以看 https://www.ruanyifeng.com/blog/2018/07/json_web_token-tutorial.html

为什么要JWT唯一性

因为JWT 认证不像session 认证有数据库记录一样, JWT 是靠token 来进行认证的,如果一个系统不希望一个账号多处登陆,就要对用户的token唯一性做认证

实现方法

重写 rest_framework_jwt的认证方法并加入缓存。

用户登陆时将token储存在redis中, 登陆时替换掉原有的token。

重写登陆认证

class HotNestLoginSerializer(Serializer):
   def __init__(self, *args, **kwargs):
       """
       登陆验证
       """
       super(HotNestLoginSerializer, self).__init__(*args, **kwargs)

       self.fields[self.username_field] = serializers.CharField()
       self.fields['password'] = PasswordField(write_only=True)

   @property
   def username_field(self):
       return get_username_field()

   def validate(self, attrs):
       credentials = {
           self.username_field: attrs.get(self.username_field),
           'password': attrs.get('password')
       }
       has_reg = User.objects.filter(
           Q(username=attrs.get(self.username_field)) | Q(email=attrs.get(self.username_field))).first()
       if not has_reg:
           raise serializers.ValidationError(detail=NO_REGISTER)

       if all(credentials.values()):
           user = authenticate(**credentials)

           if user:
               if not user.is_active:
                   msg = _('User account is disabled.')
                   raise serializers.ValidationError(msg)

               payload = jwt_payload_handler(user)
               token = jwt_encode_handler(payload)

               cache.set(user.email + settings.REDIS_TOKEN_SUFFIX, token, timeout=36800 * 7)

               return {
                   'token': token,
                   'user': user
               }

           else:
               raise serializers.ValidationError(detail=ACCOUNT_PASSWORD_ERROR)
       else:
           msg = _('Must include "{username_field}" and "password".')
           msg = msg.format(username_field=self.username_field)
           raise serializers.ValidationError(msg)

重写 token verify 接口

class HotNestJSONWebTokenAuthentication(BaseJSONWebTokenAuthentication):

   def get_jwt_value(self, request):
       auth = get_authorization_header(request).split()
       auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()

       if not auth:
           if api_settings.JWT_AUTH_COOKIE:
               return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE)
           return None

       if smart_text(auth[0].lower()) != auth_header_prefix:
           return None

       if len(auth) == 1:
           msg = _('Invalid Authorization header. No credentials provided.')
           raise exceptions.AuthenticationFailed(msg)
       elif len(auth) > 2:
           msg = _('Invalid Authorization header. Credentials string '
                   'should not contain spaces.')
           raise exceptions.AuthenticationFailed(msg)
       try:
           info = jwt_decode_handler(auth[1])
       except Exception as e:
           raise serializers.ValidationError(detail=TOKEN_EXPIRED)
       
       cache_token = cache.get(info['email'] + settings.REDIS_TOKEN_SUFFIX)
       if cache_token ==auth[1].decode("utf-8"):
           return auth[1]
       else:
           raise serializers.ValidationError(detail=TOKEN_EXPIRED)