将在前面的话

fastapi默认的组件,官网上其实也有,关于权限什么的,在这里 FastAPI 安全性简介
但是我在学习过程中,发现他官方提供的默认组件,有以下缺陷(当然这些是我个人的一些看法,可能我FastAPI本身学习的不到位,没有get到他的一些知识点,如果有大佬能够有更好的解决方案,希望能放在评论区 ):

  1. 用户名,过期时间等关键信息,存储在客户端,并且使用默认的jwt,基本没有加密存在安全隐患
  2. 默认组件并不会刷新用户验证信息的过期时间,每次登录会话持续的时间是固定的,而不是随着用户的访问,刷新持续时间。
  3. 提供的功能并不完善,如果加入不同的权限,其实还是有很多工作量。
  4. 默认将授权码,放在了requests headers里,很多时候其实不太方便,主要是习惯了cookie,毕竟浏览器会自动携带cookie信息。

所以我参考flask-login,使用Fastapi依赖的方式,写了一个权限组件,考虑到这部分组件未来开发可能还是用得到的,所以特意上传一下。

第一节是使用起来的效果,后续是具体实现的过程,详细阅读的话建议把第一节放在最后看。

1. 在视图中使用

  • 首先导入实例化的权限管理对象
from ..permission import role_required
  • 当某个视图,需要某权限才能访问的时候,我们可以这样调用:
    通过fastapi框架的依赖,就可以先通过权限组件,获取当前的用户信息,通过传递不同的参数,锁定不同的权限需要,如果权限不足,则返回错误的状态码。
@router.get("/")
async def read_users(
    current_user: schemas.PyUser=Depends(role_required("管理员"))
):
    print(current_user)
    return [{"username": "Foo"}, {"username": "Bar"}]
  • 登录时,注册当前用户
    调用role_required.login(response, current_user),将当前用户注册进权限组件,这样当前用户访问其他需要权限的路由时,就会自动判断权限了
@router.post("/login/")
async def login(
    user:schemas.PyUserLogin, 
    response: Response,
    session: Session = Depends(get_db),
):
    dbuser = session.query(db.User).filter(db.User.username == db.User.username).first()
    if not (dbuser and dbuser.verify_password(user.password)):
        raise ApiException(
            code = 1001,
            message = "账号不存在或密码错误"
        )
    current_user = schemas.PyUser.from_orm(dbuser)
    role_required.login(response, current_user)
    return ApiResponse(
        code = 0,
        message = "登录成功",
        data = {
            "username": current_user.username,
            "realname": current_user.realname,
            "description": current_user.description
        }
    )
  • 登出时,注销当前用户
    登出时,调用role_required.logout(request)即可从权限组件中注销掉当前用户,此时组件中用户对应的cookie会立即移除,这样就去掉了用户的登录信息了
@router.post("/logout/")
async def logout(
    request: Request,
    _: schemas.PyUser=Depends(role_required("管理员"))
):
    current_user = role_required.logout(request)
    return ApiResponse(
        code = 0,
        message = "登出成功",
        data = {
            "username": current_user.username,
            "realname": current_user.realname,
            "description": current_user.description
        }
    )

2. 文件结构

在这里插入图片描述

权限定义

这部分的定义如果又需要的话,可以从数据库查询之类的,等等,这里只使用了最简单的定义
这部分代码在文件roles.py

roles = {
    "超级管理员": 1,
    "管理员": 2,
    "标注员": 3,
    "编辑者": 4,
    "审核员": 5,
    "游客": 6,
}

3. 权限组件类

权限组件核心类,代码在require.py
三个主要函数:

  1. login() 用户登录
  2. logout() 用户登出
  3. self() 定义访问需要的权限
  • 需要注意的是,我在出现权限不足时,返回的ApiException是一个本项目中自定义的错误状态,如有需要,自行更改。
  • 具体的使用方法在下面
from fastapi import HTTPException, Response, Request
import copy
from datetime import datetime, timedelta
import uuid
from starlette import status
from ..exception.apiexception import ApiResponse, ApiException

class RoleRequired:
    def __init__(
        self,
        guest,
        roles,
        redirect_url: str = None,
        expire_minutes: int = 30,
        clear_interval: int = 60,
    ):
        self.sessions = {}
        self.roles = roles
        self.guest = guest
        self.redirect_url = redirect_url
        self.expire_minutes = expire_minutes
        self.userclass = type(self.guest)
        self.last_clear_time = datetime.utcnow()
        self.interval = timedelta(minutes=clear_interval)


    def __clear_overstayed(self):
        now = datetime.utcnow()
        if (now - self.last_clear_time) < self.interval:
            return
        self.last_clear_time = now
        self.sessions = { k: v for k, v in self.sessions.items() if v['exp'] < now }


    def __create_token(self, response: Response, user=None):
        # print(self.sessions)
        self.__clear_overstayed()
        authorization = str(uuid.uuid1())
        response.set_cookie(key="authorization", value=authorization)
        if not user:
            user = copy.deepcopy(self.guest)
        self.sessions[authorization] = {
            "user": user,
            "exp": datetime.utcnow() + timedelta(minutes=self.expire_minutes)
        }
        return self.sessions[authorization]

    def __update_exp(self, authorization):
        exp = datetime.utcnow() + timedelta(minutes=self.expire_minutes)
        self.sessions[authorization]["exp"] = exp
        return self.sessions[authorization]

    def __verify_roles(self, user, roleids):
        if isinstance(roleids, list):
            return user.roleid in roleids
        return user.roleid == roleids


    def login(self, response: Response, user):
        # assert type(user) == type(self.guest)
        current_session = self.__create_token(response, user)
        return current_session['user']
    
    def logout(self, request: Request):
        authorization = request.cookies.get("authorization", None)
        try:
            current_session = self.sessions.pop(authorization)
            return current_session['user']
        except :
            raise ApiException(code=1005, message="当前未登录,登出发生错误")
        
    def __call__(self, *roles, **kwargs):
        login_only = kwargs.get("login_only", False)
        roles = [ self.roles[x] for x in roles ]
        if login_only:
            roles = list(self.roles.values())
        async def func(request: Request, response: Response) -> self.userclass:
            authorization = request.cookies.get("authorization", None)
            if not authorization or authorization not in self.sessions:
                current_session = self.__create_token(response)
            else:
                ntime = datetime.utcnow()
                session = self.sessions.get(authorization, None)
                if session['exp'] < ntime:
                    self.sessions.pop(authorization)
                    current_session = self.__create_token(response)
                else:
                    current_session = self.__update_exp(authorization)
            current_user = current_session['user']
            if not roles or self.__verify_roles(current_user, roles):
                return current_session['user']
            else:
                raise ApiException(
                    code=1004, message="权限不足"
                )
        return func

4. 实例化权限类

这部分代码在 权限包中__init__.py 文件中
实例化的主要参数如下:

  • guest:当没有登录的游客访问时,默认给他们的一个游客账户
  • roles:权限的字典,就是roles.py中定义的字典
  • redirect_url: str = None:没有登录或者没有权限时,跳转的url(这部分功能没实现,因为直接返回1004权限不足状态码了)
  • expire_minutes: int = 30:登录状态持续时间,过期的cookie将不再生效
  • clear_interval: int = 60:时间间隔超过60分钟时,如果有请求,将会清空一次过期的cookie
from .require import RoleRequired
from ..models.schemas import  users as schemas
from .roles import roles
from app.settings import SESSION_DURATION

role_required = RoleRequired(guest=schemas.PyUser( userid=0, username="guest", password="guest", realname="guest", roleid=6 ),
                             roles=roles,
                             expire_minutes=SESSION_DURATION)
Logo

鸿蒙生态一站式服务平台。

更多推荐