fetch 请求流式chunk 处理数据
摘要 本文介绍了一种基于Node.js的流式数据渲染方案,模拟ChatGPT的流式请求模式。通过不设置Content-Length头,服务器自动启用chunked transfer编码,实现边计算边传输数据的效果。文章提供了Node服务端代码示例,每100ms推送10条矩形框数据,以及前端使用fetch API处理流式响应的完整实现方案。重点解析了如何将二进制流转换为结构化数据的过程,并比较了fe
模拟一个业务场景,在一张超大的画布上渲染超级多的矩形框,或者类似 ChatGPT 流式数据请求渲染
- 采用类似chatGpt 流式网络请求的模式来加载数据,边加载边渲染。
这里用 node 简单模拟一个连段不断的发出 100 个矩形框的服务端。发送 HTTP/1.1 POST;只要不给 Content‑Length,浏览器自动转 chunked transfer‑encoding,使用chunk后端即可以边算边吐。
let http = require('http');
http.createServer((req, res) => {
if (req.url === '/data') {
res.writeHead(200, {
'Content-Type': 'application/json; charset=utf-8',
'Cache-Control': 'no-cache',
Connection: 'keep-alive', // optional,但习惯留
// 不写 Content-Length ⇒ 自动 chunked
});
// 假设你已经有 1000 条数据
const items = Array.from({ length: 1000 }, (_, i) => ({ id: i + 1, msg: `Hello #${i + 1}`,rect:[{x:1,y:2,height:10,width:10}] }));
// 每 10 条推一次,模拟耗时业务
let idx = 0;
const timer = setInterval(() => {
if (idx >= items.length) {
clearInterval(timer);
res.end(); // 结束连接
return;
}
const slice = items.slice(idx, idx += 10);
slice.forEach(obj => res.write(JSON.stringify(obj) + '\n')); // NDJSON 必须 \n
}, 100); // 100 ms 一批
} else {
res.writeHead(404).end();
}
}).listen(8088, () => console.log('🚀 http://localhost:8088/data'));
简单在终端测试:
curl http://localhost:8088/data
流式输出没有问题
//100 ms 输出一行
{"id":1,"msg":"Hello #1","rect":[{"x":1,"y":2,"height":10,"width":10}]}
....
{"id":1000,"msg":"Hello #1000","rect":[{"x":1,"y":2,"height":10,"width":10}]}
//end 最后一行结束
web端代码:
//发起请求
export const POST_BASIC_STREAM = async (data:any):Promise<any>=>{
const controller = new AbortController();
const res = await fetch(API.BASIC_PREDICT, {
method: 'POST',
headers: { 'Content-Type': 'application/json'},
// 如果你是 Chrome ≥118,想要边传 body 边收响应,可以加 duplex:'half'
body: JSON.stringify(data),
signal: controller.signal
});
if (!res.ok) throw new Error(res.statusText);
return [res, controller];
}
//此时res.body是一个可读流,完成字节到可读文本的解析即可
将原始的二进制流逐步解码、解析成结构化数据(JSON)。
import { EventSourceParserStream } from 'eventsource-parser/stream';
export async function* streamToIterator(responseBody: ReadableStream<Uint8Array>):AsyncGenerator{
const eventStream = responseBody
.pipeThrough(new TextDecoderStream())//将二进制流(Uint8Array)解码为字符串。
.pipeThrough(new EventSourceParserStream())//解析 Server-Sent Events (SSE) 格式的数据。
.getReader();//获取一个读取器,允许你逐块读取流中的内容。
while (true) {
const { value, done } = await eventStream.read();
if (done) {
yield { done: true, value: null}
break;
}
try{
const parsedData = JSON.parse(value.data)
yield { done: false, value: parsedData };
}catch (e) {
console.error('yield* iterable => parse 生成器数据异常');
}
}
}
streamToIterator
是一个将 ReadableStream<Uint8Array>
转换为异步生成器(AsyncGenerator)的函数,用于逐步读取流中的数据,并将其解析后产出。
//调用
const [res,controller] = await POST_BASIC_STREAM(postdata);
if(res && res.ok && res.body){
const dataStream = await streamToIterator(res.body);
for await (const chunk of dataStream) {
if(chunk.done){
console.log("end:",chunk);
}else{
console.log('ing:',chunk);
}
}
}
为什么选 fetch 来做流式,而不是用基于 XMLHttpRequest (XHR) 的 axios?
这是因为:XHR 天生没有「stream」型。无法高效的实现可读流,这种类似的场景下,不是一个最优的选择
Server-Sent Events 和 chunk 有什么区别?Chunk (HTTP chunked transfer coding)
→ 传输层技巧:把⼀个 HTTP 响应拆成 N 个“⻓度 + 数据”的⽚段,解决「返回前不知道总⻓度」的问题。浏览器收到后再拼回完整实体。
SSE (Server-Sent Events)
→ 应⽤层协议:在 text/event-stream 里约定 data:…\n\n
这类字段格式,再借助(常见的)chunked 把这些“事件”持续推给客户端;浏览器用 EventSource API 能解析、重连、按事件派发并且仅支持 get 请求方式。这里的具体使用可以翻看我之前的文章。当然。大多数 SSE 响应底层就用 chunked,但一个纯 chunked 流并不自动成为 SSE 流;只有遵守 text/event-stream 格式、客户端用 EventSource 才算真正的 SSE。
维度 | Chunked | SSE |
---|---|---|
单向/双向 | 只是⼀个响应体怎么传,方向由 HTTP 决定 | 天然单向:Server → Client |
重连机制 | 要⾃⼰写 JS、处理偏移量 (一般只读就行) | 浏览器自动带上 Last-Event-ID: 头帮你续传 |
常见场景 | ⻓视⻆录制、⼤⽂件下载、多数据实时输出、流式 AI 输出(写⾃定义协议) ——“实时生成” |
股票⾏情推送、ChatGPT 式增量补全、实时日志、进度条 |
代理/缓存坑位 | 是 hop-by-hop 头,CDN/反向代理可能把所有 chunk 合并再发;压缩后浏览器最多要等 ~1 KiB 才触发 progress 事件。(stackoverflow.com) |
同样依赖 chunk,但因每条事件通常几十字节 + 心跳,多数代理不会合并太久;再搭配自动重连容错性更好。 |
Generator 函数详解
Generator 是 JavaScript 中一种特殊的函数,用于控制函数的执行流程,并允许函数在执行过程中暂停和恢复。它通过 function*
语法定义,使用 yield
关键字来暂停函数的执行。
特点:
- 可中断:通过
yield
暂停函数执行。 - 可恢复:通过调用
.next()
方法继续执行。 - 返回一个迭代器对象(Iterator):可以逐步获取值。
- 支持异步操作(配合
async/await
可以实现异步流控制)。
基本结构
function* generatorFunction() {
yield value1;
yield value2;
return finalValue;
}
示例代码(同步)
function* simpleGenerator() {
console.log('Start');
yield 1;
yield 2;
yield 3;
console.log('End');
}
const gen = simpleGenerator();
console.log(gen.next()); // { value: 1, done: false }
console.log(gen.next()); // { value: 2, done: false }
console.log(gen.next()); // { value: 3, done: false }
console.log(gen.next()); // { value: undefined, done: true }
for (const item of simpleGenerator()) {
console.log(item); // 依次输出以上
}
异步 Generator 示例(结合 async/await
)
async function* asyncNumberGenerator() {
yield Promise.resolve(1);
yield new Promise(resolve => setTimeout(resolve, 1000, 2));
yield fetch('https://jsonplaceholder.typicode.com/todos/1').then(res => res.json());
}
(async () => {
for await (const item of asyncNumberGenerator()) {
console.log(item); // 依次输出 1, 2, { id: 1, ... }
}
})();
就是一个典型的 async function*
(异步生成器),用于将 ReadableStream<Uint8Array>
转换为一个逐块读取并产出解析后数据的异步迭代器。
它的工作流程如下:
- 接收一个流
responseBody
。 - 使用
TextDecoderStream
解码二进制数据为字符串。 - 使用
getReader()
获取流读取器。 - 通过
while (true)
循环不断调用await eventStream.read()
。 - 如果数据完成 (
done === true
),则yield { done: true }
。
总结
特性 | 描述 |
---|---|
控制执行 | 通过 yield 暂停,通过 .next() 继续 |
返回类型 | Generator 对象 / AsyncGenerator 对象 |
用途 | 实现惰性求值、流式处理、状态机等场景 |
更多推荐
所有评论(0)