部分内容参考于jwt认证流程

代码实现(持续更新中):Gitee

安装

首先安装pyjwt:

pip3 install pyjwt

使用

创建token

在根目录目录新建util文件夹,新建jwt_auth.py,代码如下:

import jwt
import datetime
from django.conf import settings
from jwt import exceptions

JWT_SALT = settings.SECRET_KEY


def create_token(payload, timeout=20):
    """
    :param payload:  例如:{'user_id':1,'username':'wupeiqi'}用户信息
    :param timeout: token的过期时间,默认20分钟
    :return:
    """
    headers = {
        'typ': 'jwt',
        'alg': 'HS256'
    }
    payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout)
    result = jwt.encode(payload=payload, key=JWT_SALT, algorithm="HS256", headers=headers).decode('utf-8')
    return result


def parse_payload(token):
    """
    对token进行和发行校验并获取payload
    :param token:
    :return:
    """
    result = {'status': False, 'data': None, 'error': None}
    try:
        verified_payload = jwt.decode(token, JWT_SALT, True)
        result['status'] = True
        result['data'] = verified_payload
    except exceptions.ExpiredSignatureError:
        result['error'] = 'token已失效'
    except jwt.DecodeError:
        result['error'] = 'token认证失败'
    except jwt.InvalidTokenError:
        result['error'] = '非法的token'
    return result

其中JWT_SALT是加密需要用到的盐,我这里取的是Django的默认的SECRET

在你的登录接口使用create_token方法:

# 登录获取token
@require_http_methods(["POST"])
@method_decorator(csrf_exempt, name='dispatch')
def login(request):
    response = {}
    try:
        data = json.loads(request.body)
        user = data.get('username')
        pwd = hashlib.sha1(data.get('password').encode('utf-8')).hexdigest()
        user_obj = Creater.objects.filter(name=user,password=pwd).first()
        if not user_obj:
            response['msg'] = '用户账户或密码错误'
            response['error_num'] = 1
            return JsonResponse(response)
        token = create_token({'username': user})
        user_obj.token = token
        user_obj.save()
        response['token'] = token
        response['msg'] = 'SUCCESS'
        response['error_num'] = 0          
    except Exception as e:
        response['msg'] = str(e)
        response['error_num'] = 1
    return JsonResponse(response)

思路:显示从数据库中获取信息判断用户是否存在,如果存在,执行token = create_token({'username': user}),并将token存入数据库中

解析token

我们需要用到util中的另一个方法parse_payload,我打算用装饰器的方式写,这样可以区分一些路由需要权限与否,首先在根目录新建decorator文件夹,新建jwt.py,代码如下:

from django.conf import settings
from django.http import JsonResponse
from django.contrib.auth import get_user_model
from django.core.exceptions import PermissionDenied
from utils.jwt_auth import parse_payload
  
 
def auth_permission_required():
    def decorator(view_func):
        def _wrapped_view(request, *args, **kwargs):
                try:
                    authorization = request.META.get('HTTP_AUTHORIZATION', '')
                    auth = authorization.split()
                except AttributeError:
                    return JsonResponse({"code": 401, "message": "No authenticate header"})
 
                # 用户通过API获取数据验证流程
                if not auth:
                    return JsonResponse({'error': '未获取到Authorization请求头', 'status': False})
                if auth[0].lower() != 'bearer':
                    return JsonResponse({'error': 'Authorization请求头中认证方式错误', 'status': False})
                if len(auth) == 1:
                    return JsonResponse({'error': "非法Authorization请求头", 'status': False})
                elif len(auth) > 2:
                    return JsonResponse({'error': "非法Authorization请求头", 'status': False})
                token = auth[1]
                result = parse_payload(token)
                if not result['status']:
                    return JsonResponse(result)
 
                return view_func(request, *args, **kwargs)
 
        return _wrapped_view
 
    return decorator

定义了装饰器就可以在view中引入它,在你需要权限验证的地方加上就可以了:

# 创建文章
@require_http_methods(["POST"])
@method_decorator(csrf_exempt, name='dispatch')
@auth_permission_required()
def create_article(request):
    response = {}
    try:
        data = json.loads(request.body)
        category = Category.objects.get(name=data.get('category'))
        name = Creater.objects.get(name=data.get('name'))
        article = Article()
        article.title = data.get('title')
        article.content = data.get('content')
        article.category = category
        article.name = name
        article.save()
        response['msg'] = 'success'
        response['error_num'] = 0
    except Exception as e:
        response['msg'] = str(e)
        response['error_num'] = 1

    return JsonResponse(response)
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐