保护 REST API 并不总是那么容易。在这篇文章中,我们将讨论如何使用JSON Web 令牌 (JWT)来做到这一点。

先决条件

要使用 JWT 了解 Falcon REST API 的安全实施,应该了解 Falcon、REST 以及最后但并非最不重要的 JSON Web 令牌 (JWT)。

简介

话虽如此,由于 REST API 背后的性质和机制,保护它们并不总是那么简单。用户提交凭据后会发生什么?你怎么知道他们已经正确登录了他们的后续请求?你不能在你的服务器端保持一个状态来发出信号。所以你会怎么做?

在本文中,我想与您分享一种非常强大但简单的方法来实现这一点:使用 JSON Web Tokens。在这篇文章中,我们将在我们的 Falcon API 应用程序中实现 JWT 身份验证。我将参考我之前文章中的 /student API 示例,您可以在此处找到。

与我们的安全 API 的任何交互都将从登录请求开始。它看起来像这样:

POST /login

有效载荷:

{
    “Username”: “shantanu”
    “Password”: “password”
}

验证凭证后,系统将返回一个新的 JSON Web Token。其中将包括以下有效负载:

{
    "user_id": user.id
}

为简单起见,我们将使用 "HS256" 算法对数据进行编码,这意味着我们将在客户端和 API 上使用相同的密钥。

出于本示例的目的,我们的秘密将是:

secret

认证机制

为了对用户进行身份验证并生成 JWT,我们将创建一个路由 /login ,它将接受用户名和密码,并在使用数据库验证这些凭据后,它将生成一个 JWT 并返回它以响应请求。

python的### jwt模块

对于 JWT 的编码和解码,我们将使用 python 的 jwt 模块。该模块包含在 python 中,因此您无需单独安装。

创建用户模型

为了验证用户凭据,我们需要检查具有这些凭据的用户是否存在于数据库中。为此,我们将首先创建一个用户模型。

使用以下代码创建UserModel.py文件:

from sqlobject import *
from Connection import conn


class User(SQLObject):
    _connection = conn
    username = StringCol(length=30, notNone=True, unique=True)
    password = StringCol(length=30, notNone=True)


User.createTable(ifNotExists=True)

用户模型具有与数据库中用户表中的列映射的属性。我们将全局数据库连接对象传递给模型,以便它可以与数据库通信。

定义用户存储库

现在我们将定义一个将查询数据库的用户存储库。它将在 where 子句中执行带有用户名和密码的 select 语句。它将在用户模型的帮助下对数据库执行查询。

使用以下代码创建UserModel.py文件:

from UserModel import User
from sqlobject import *


class UserRepository:
    @staticmethod
    def get_user(username, password):
        return User.select(AND(User.q.username == username, User.q.password == password))

创建登录服务(Request Handler)

现在我们将定义一个登录服务,它将处理来自客户端的登录请求,并通过调用用户存储库提供 JWT 作为响应。

使用以下代码创建LoginService.py文件:

import falcon
import jwt
import json

from UserRepository import UserRepository


class LoginService:
    def __init__(self):
        pass

    def login(self, req, resp):
        req_params = json.loads(req.stream.read())
        print("Attempting Login")

        if not req_params or not req_params["Username"] or not req_params["Password"]:
            raise falcon.HTTPBadRequest("Bad Request", "Please enter valid Username and Password")
        else:
            print("authenticating with username: {} and password: {}".format(req_params["Username"], req_params["Password"]))
            self._authenticate(req_params["Username"], req_params["Password"], req, resp)

    def _authenticate(self, username, password, req, resp):
        if not username or not password:
            raise falcon.HTTPBadRequest("Bad Request", "Please enter valid Username and Password")
        else:
            print("Fetching User Form DB")
            users = UserRepository.get_user(username, password)
            print("User Info: {}".format(users))
            if users.count() > 0:
                user = users[0]
                payload = {
                    "user_id": user.id
                }
                secret = "secret"
                algo = "HS256"
                token = jwt.encode(payload=payload, key=secret, algorithm=algo)
                print(token)
                resp.media = {
                    "token": token.decode("utf-8")
                }
                resp.status = falcon.HTTP_200
            else:
                raise falcon.HTTPUnauthorized("Unauthorized", "Invalid Credentials")

    def on_post(self, req, resp):
        self.login(req, resp)


def main():
    pass


if __name__ == "__main__": main()

在登录服务中,我们只导入了 falcon、jwt 和 json 模块。我们还导入了 USerRepository,它将用于验证数据库中的用户凭据。我们在这里实现了一个 on_post http 方法,它将处理来自客户端的 POST 请求,并在验证请求后返回一个 jwt 作为响应。 on_post 方法在内部调用 login 方法,该方法将提取请求有效负载并从中获取用户名和密码。获取用户名和密码后,它调用 authnticate 方法,该方法在用户存储库的帮助下使用数据库实际验证用户凭据,并生成适当的响应。这里我们使用 jwt.encode() 方法来生成 jwt。在我们的例子中,我们已经传递了有效载荷,即 {user_id:""},在我们的例子中,密钥即“秘密”,我们决定使用的算法是“HS256”。使用这些将生成类似于以下示例 jwt 令牌的 jwt。如果凭据有效,它将生成一个 jwt,其中包括有效负载中的 user_id 并返回,否则如果凭据无效,它将返回 HTTPUnauthorized 请求错误。

eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoxfQ.J_RIIkoOLNXtd5IZcEwaBDGKGA3VnnYmuXnmhsmDEOs

同时,客户端应用程序发送的任何进一步请求都将包含相同的令牌,而服务器将通过每次重新签名并将结果与令牌的签名部分进行比较来验证该令牌。

在典型的 JWT 请求中,您将在客户端必须登录后将令牌作为授权标头的一部分传递,如下所示:

Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoxfQ.J_RIIkoOLNXtd5IZcEwaBDGKGA3VnnYmuXnmhsmDEOs

定义一个中间件来验证请求

现在我们将定义一个中间件,它将拦截到达我们应用程序的每个请求,并检查请求是否在授权标头中包含 JWT。它提取 JWT 并检查其是否有效。如果不是有效的,则请求将被拒绝,并且将发送未经授权的响应。如果 JWT 有效,则请求将被向前处理。它解码 JWT 并提取有效负载,并从有效负载中验证 user_id。

使用以下代码创建一个AuthHandler.py文件:

import falcon
import jwt
import sqlobject.dberrors
from UserModel import User


class AuthHandler:
    def process_request(self, req, resp):
        if "/login" in req.path:
            return

        if req.get_header("Authorization"):
            auth_header = req.get_header("Authorization").split(" ")
            token = auth_header[1]
            if token:
                if not self._is_token_valid(token):
                    description = "The provided auth token is not valid.Please request a new token and try again."
                    raise falcon.HTTPUnauthorized("Unauthorized", description)
        else:
            description = "The provided auth token is not valid.Please request a new token and try again."
            raise falcon.HTTPUnauthorized("Unauthorized", description)

    def _is_token_valid(self, token):
        try:
            payload = jwt.decode(jwt=token, key="secret", algorithms="HS256")
            userid = payload["user_id"]
            user = User.get(payload['user_id'])
            print("Authenticated, User : {}".format(user))
            return True
        except (jwt.DecodeError, jwt.ExpiredSignatureError, sqlobject.dberrors.Error):
            return False

在这里,AuthHandler 实现了 process_request 方法,一旦发出请求,该方法就会被执行。 process_request 方法将检查请求路径,如果它不是“/login”,那么它将检查授权标头。如果 Authorization 标头不存在,则请求将被拒绝并出现 Uathorized 错误。然后它将从 Authorization 标头中提取 JWT 并对其进行验证,并使用 jwt.decode() 方法从中提取信息。如果 JWT 有效,则只会进一步处理请求,否则将返回未经授权的错误。

注册我们应用程序的登录路由

在我们的app.py中导入 LoginService。

from LoginService import LoginService

在我们的应用程序中添加路由“/login”。

app.add_route("/login", LoginService())

在你的Application中注册AuthHandler作为中间件

现在我们将通过简单地使用 falcon.App() 的 add_middleware() 将我们的 AuthHandler 类注册为 out 应用程序的中间件。通过这样做,我们将确保到达我们应用程序的每个请求都应该被拦截并且是安全的。

在您的app.py中导入 AuthHandler 。

from AuthHandler import AuthHandler

并添加以下行以将 AuthHandler 注册为中间件。

app.add_middleware(AuthHandler())

您的项目目录应如下所示:

image.png

运行应用

使用以下命令运行服务器。

gunicorn --reload app:app

应用程序运行后,我们可以调用在localhost:8000上提供的 REST API。首先,我们将调用“/login” API 来获取 JWT,并使用该 JWT 我们将尝试调用其他 API。我们将在我们的 API 请求中添加一个 Authorization 标头作为 Bearer 令牌,如下所示。

Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoxfQ.J_RIIkoOLNXtd5IZcEwaBDGKGA3VnnYmuXnmhsmDEOs

在添加授权标头之前,只需尝试调用 get Student API,它应该会给出一个未经授权的错误,如下所示。

{
    "title": "Unauthorized",
    "description": "The provided auth token is not valid.Please request a new token and try again."
}

现在我们将尝试添加授权标头,它应该给出 200 OK 响应,并带有一些数据作为响应。

可以参考这个Postman合集进行测试。

是的,我们完成了。我们刚刚使用 JWT 保护了我们的 Falcon REST API。

实现-成就.gif

Logo

学AI,认准AI Studio!GPU算力,限时免费领,邀请好友解锁更多惊喜福利 >>>

更多推荐