简介

服务器发送事件 (SSE) 是一种无需重新加载页面即可将数据发送到浏览器的方法。这使您可以使用流数据并构建可用于各种场景的实时应用程序。

FastAPI 是一个 Python 框架,可以轻松构建 API。

在本教程中,我们将使用 FastAPI 创建一个简单的 SSE 服务器,它将每秒发送一条消息。

先决条件

为了学习本教程,您需要在您的机器上安装 Python 和 pip:

https://www.python.org/downloads/

安装 FastAPI

要安装 FastAPI 及其所有依赖项,可以使用以下命令:

pip install "fastapi[all]"

进入全屏模式 退出全屏模式

这还将包括用于运行服务器的uvicorn服务器。

安装sse-starlette

安装 FastAPI 后,您可以安装sse-starlette扩展来为您的 FastAPI 项目添加对 SSE 的支持:

pip install sse-starlette

进入全屏模式 退出全屏模式

让我们也将asyncio添加到我们的项目中:

pip install asyncio

进入全屏模式 退出全屏模式

创建一个简单的 hello world 端点

安装 FastAPI 后,您可以创建一个简单的 hello world 端点以开始使用。

创建一个名为main.py的新文件并添加以下代码:

import asyncio
import uvicorn
from fastapi import FastAPI, Request

app = FastAPI()


@app.get("/")
async def root():
    return {"message": "Hello World"}

进入全屏模式 退出全屏模式

运行uvicorn服务器

要运行服务器,可以使用以下命令:

uvicorn main:app --reload

进入全屏模式 退出全屏模式

这将在端口8000上运行服务器。--reload标志将在您更改代码时自动重新加载服务器,因此您不必每次进行更改时都重新启动服务器。

在浏览器中访问服务器,您应该会看到以下输出:

{
    "message": "Hello World"
}

进入全屏模式 退出全屏模式

FastAPI 将自动生成一个/docs端点,它将向您显示 API 文档。如果您要访问/docs,您将看到以下内容:

[FastAPI docs 端点](https://res.cloudinary.com/practicaldev/image/fetch/s--sdyt-iOa--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://imgur .com/C1Qmszx.png)

为您的 FastAPI 项目添加 SSE 支持

接下来,让我们扩展main.py文件以添加 SSE 支持。为此,您可以通过将以下行添加到您的main.py文件来为您的项目添加 SSE 支持:

from sse_starlette.sse import EventSourceResponse

进入全屏模式 退出全屏模式

然后,您可以使用EventSourceResponse类创建将发送 SSE 事件的响应。让我们创建一个每秒发送一个事件的新端点:

STREAM_DELAY = 1  # second
RETRY_TIMEOUT = 15000  # milisecond

@app.get('/stream')
async def message_stream(request: Request):
    def new_messages():
        # Add logic here to check for new messages
        yield 'Hello World'
    async def event_generator():
        while True:
            # If client closes connection, stop sending events
            if await request.is_disconnected():
                break

            # Checks for new messages and return them to client if any
            if new_messages():
                yield {
                        "event": "new_message",
                        "id": "message_id",
                        "retry": RETRY_TIMEOUT,
                        "data": "message_content"
                }

            await asyncio.sleep(STREAM_DELAY)

    return EventSourceResponse(event_generator())

进入全屏模式 退出全屏模式

现在,如果您在浏览器中访问/stream端点,您将看到每秒发送一个事件,而无需重新加载页面。

带有流数据和 Materialise 的 FastAPI

要了解有关流数据的更多信息,您可以在此处查看本教程,了解如何将 FastAPI 与 Materialize 结合使用:

如何使用 FastAPI 和 Materialise 进行实时数据处理

本教程还包括一个演示项目,您可以运行该项目以了解它是如何工作的。

这是该项目的快速图表:

[FastAPI with Materialize](https://res.cloudinary.com/practicaldev/image/fetch/s--5PGHnPZj--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://user-images .githubusercontent.com/21223421/153422573-ef8d360e-4c31-42fa-ae8f-4327741659e7.png)

什么是 Materialise?

Materialize 是一个流式数据库,它获取来自不同来源(如 Kafka、PostgreSQL、S3 存储桶等)的数据,并允许用户编写聚合/物化该数据的视图,并允许您使用纯 SQL 以极低的延迟查询这些视图。

使用 Materialise 流式传输数据

对于演示项目,我们使用的是zwz100071 TAIL zwz100072 zwz100070语句。TAIL在发生更新时从源、表或视图中流式传输更新,这允许您在数据更新时查询数据,非常适合 SSE 示例。

以下是使用TAIL流式传输数据的/stream端点的代码:

@app.get('/stream')
async def message_stream(request: Request):
    def new_messages():
        # Check if data in table
        results = engine.execute('SELECT count(*) FROM sensors_view_1s')
        if results.fetchone()[0] == 0:
            return None
        else:
            return True

    async def event_generator():
        while True:
            # If client was closed the connection
            if await request.is_disconnected():
                break

            # Checks for new messages and return them to client if any
            if new_messages():
                connection = engine.raw_connection()
                with connection.cursor() as cur:
                    cur.execute("DECLARE c CURSOR FOR TAIL sensors_view_1s")
                    cur.execute("FETCH ALL c")
                    for row in cur:
                        yield row

            await asyncio.sleep(MESSAGE_STREAM_DELAY)

    return EventSourceResponse(event_generator())

进入全屏模式 退出全屏模式

如您所见,我们刚刚扩展了new_message函数来检查sensors_view_1s视图中是否有任何新消息。如果没有新消息,我们将返回None并且EventSourceResponse不会发送任何事件。如果有新消息,我们将返回True,EventSourceResponse将发送新消息。

然后在event_generator异步函数中,我们使用TAILFETCH ALL语句来获取sensors_view_1s视图中的所有消息。我们正在使用DECLARE CURSOR语句来创建一个游标,该游标将在数据更新时流式传输数据。

结论

要了解有关 FastAPI 的更多信息,请查看FastAPI 文档。

有关如何将 FastAPI 与 Materialize 一起使用的更多信息,请查看此教程。

要了解有关 Materialise 的更多信息,请查看Materialize 文档。

Logo

学AI,认准AI Studio!GPU算力,限时免费领,邀请好友解锁更多惊喜福利 >>>

更多推荐