如何在 FastAPI 中使用服务器发送事件(SSE)?
简介 服务器发送事件 (SSE) 是一种无需重新加载页面即可将数据发送到浏览器的方法。这使您可以使用流数据并构建可用于各种场景的实时应用程序。 FastAPI 是一个 Python 框架,可以轻松构建 API。 在本教程中,我们将使用 FastAPI 创建一个简单的 SSE 服务器,它将每秒发送一条消息。 先决条件 为了学习本教程,您需要在您的机器上安装 Python 和 pip: https:/
简介
服务器发送事件 (SSE) 是一种无需重新加载页面即可将数据发送到浏览器的方法。这使您可以使用流数据并构建可用于各种场景的实时应用程序。
FastAPI 是一个 Python 框架,可以轻松构建 API。
在本教程中,我们将使用 FastAPI 创建一个简单的 SSE 服务器,它将每秒发送一条消息。
先决条件
为了学习本教程,您需要在您的机器上安装 Python 和 pip:
https://www.python.org/downloads/
安装 FastAPI
要安装 FastAPI 及其所有依赖项,可以使用以下命令:
pip install "fastapi[all]"
进入全屏模式 退出全屏模式
这还将包括用于运行服务器的uvicorn
服务器。
安装sse-starlette
安装 FastAPI 后,您可以安装sse-starlette
扩展来为您的 FastAPI 项目添加对 SSE 的支持:
pip install sse-starlette
进入全屏模式 退出全屏模式
让我们也将asyncio
添加到我们的项目中:
pip install asyncio
进入全屏模式 退出全屏模式
创建一个简单的 hello world 端点
安装 FastAPI 后,您可以创建一个简单的 hello world 端点以开始使用。
创建一个名为main.py
的新文件并添加以下代码:
import asyncio
import uvicorn
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
进入全屏模式 退出全屏模式
运行uvicorn
服务器
要运行服务器,可以使用以下命令:
uvicorn main:app --reload
进入全屏模式 退出全屏模式
这将在端口8000
上运行服务器。--reload
标志将在您更改代码时自动重新加载服务器,因此您不必每次进行更改时都重新启动服务器。
在浏览器中访问服务器,您应该会看到以下输出:
{
"message": "Hello World"
}
进入全屏模式 退出全屏模式
FastAPI 将自动生成一个/docs
端点,它将向您显示 API 文档。如果您要访问/docs
,您将看到以下内容:
[](https://res.cloudinary.com/practicaldev/image/fetch/s--sdyt-iOa--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://imgur .com/C1Qmszx.png)
为您的 FastAPI 项目添加 SSE 支持
接下来,让我们扩展main.py
文件以添加 SSE 支持。为此,您可以通过将以下行添加到您的main.py
文件来为您的项目添加 SSE 支持:
from sse_starlette.sse import EventSourceResponse
进入全屏模式 退出全屏模式
然后,您可以使用EventSourceResponse
类创建将发送 SSE 事件的响应。让我们创建一个每秒发送一个事件的新端点:
STREAM_DELAY = 1 # second
RETRY_TIMEOUT = 15000 # milisecond
@app.get('/stream')
async def message_stream(request: Request):
def new_messages():
# Add logic here to check for new messages
yield 'Hello World'
async def event_generator():
while True:
# If client closes connection, stop sending events
if await request.is_disconnected():
break
# Checks for new messages and return them to client if any
if new_messages():
yield {
"event": "new_message",
"id": "message_id",
"retry": RETRY_TIMEOUT,
"data": "message_content"
}
await asyncio.sleep(STREAM_DELAY)
return EventSourceResponse(event_generator())
进入全屏模式 退出全屏模式
现在,如果您在浏览器中访问/stream
端点,您将看到每秒发送一个事件,而无需重新加载页面。
带有流数据和 Materialise 的 FastAPI
要了解有关流数据的更多信息,您可以在此处查看本教程,了解如何将 FastAPI 与 Materialize 结合使用:
如何使用 FastAPI 和 Materialise 进行实时数据处理
本教程还包括一个演示项目,您可以运行该项目以了解它是如何工作的。
这是该项目的快速图表:
[](https://res.cloudinary.com/practicaldev/image/fetch/s--5PGHnPZj--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://user-images .githubusercontent.com/21223421/153422573-ef8d360e-4c31-42fa-ae8f-4327741659e7.png)
什么是 Materialise?
Materialize 是一个流式数据库,它获取来自不同来源(如 Kafka、PostgreSQL、S3 存储桶等)的数据,并允许用户编写聚合/物化该数据的视图,并允许您使用纯 SQL 以极低的延迟查询这些视图。
使用 Materialise 流式传输数据
对于演示项目,我们使用的是zwz100071 TAIL zwz100072 zwz100070
语句。TAIL
在发生更新时从源、表或视图中流式传输更新,这允许您在数据更新时查询数据,非常适合 SSE 示例。
以下是使用TAIL
流式传输数据的/stream
端点的代码:
@app.get('/stream')
async def message_stream(request: Request):
def new_messages():
# Check if data in table
results = engine.execute('SELECT count(*) FROM sensors_view_1s')
if results.fetchone()[0] == 0:
return None
else:
return True
async def event_generator():
while True:
# If client was closed the connection
if await request.is_disconnected():
break
# Checks for new messages and return them to client if any
if new_messages():
connection = engine.raw_connection()
with connection.cursor() as cur:
cur.execute("DECLARE c CURSOR FOR TAIL sensors_view_1s")
cur.execute("FETCH ALL c")
for row in cur:
yield row
await asyncio.sleep(MESSAGE_STREAM_DELAY)
return EventSourceResponse(event_generator())
进入全屏模式 退出全屏模式
如您所见,我们刚刚扩展了new_message
函数来检查sensors_view_1s
视图中是否有任何新消息。如果没有新消息,我们将返回None
并且EventSourceResponse
不会发送任何事件。如果有新消息,我们将返回True
,EventSourceResponse
将发送新消息。
然后在event_generator
异步函数中,我们使用TAIL
和FETCH ALL
语句来获取sensors_view_1s
视图中的所有消息。我们正在使用DECLARE CURSOR
语句来创建一个游标,该游标将在数据更新时流式传输数据。
结论
要了解有关 FastAPI 的更多信息,请查看FastAPI 文档。
有关如何将 FastAPI 与 Materialize 一起使用的更多信息,请查看此教程。
要了解有关 Materialise 的更多信息,请查看Materialize 文档。
更多推荐
所有评论(0)