模拟一个业务场景,在一张超大的画布上渲染超级多的矩形框,或者类似 ChatGPT 流式数据请求渲染

  • 采用类似chatGpt 流式网络请求的模式来加载数据,边加载边渲染。

这里用 node 简单模拟一个连段不断的发出 100 个矩形框的服务端。发送 HTTP/1.1 POST;只要不给 Content‑Length,浏览器自动转 chunked transfer‑encoding,使用chunk后端即可以边算边吐。

let http = require('http');

http.createServer((req, res) => {
  if (req.url === '/data') {
    res.writeHead(200, {
      'Content-Type': 'application/json; charset=utf-8',
      'Cache-Control': 'no-cache',
      Connection: 'keep-alive',          // optional,但习惯留
      // 不写 Content-Length ⇒ 自动 chunked
    });

    // 假设你已经有 1000 条数据
    const items = Array.from({ length: 1000 }, (_, i) => ({ id: i + 1, msg: `Hello #${i + 1}`,rect:[{x:1,y:2,height:10,width:10}] }));

    // 每 10 条推一次,模拟耗时业务
    let idx = 0;
    const timer = setInterval(() => {
      if (idx >= items.length) {
        clearInterval(timer);
        res.end();                       // 结束连接
        return;
      }
      const slice = items.slice(idx, idx += 10);
      slice.forEach(obj => res.write(JSON.stringify(obj) + '\n')); // NDJSON 必须 \n
    }, 100);                             // 100 ms 一批
  } else {
    res.writeHead(404).end();
  }
}).listen(8088, () => console.log('🚀 http://localhost:8088/data'));

简单在终端测试:

curl http://localhost:8088/data

流式输出没有问题

//100 ms 输出一行
{"id":1,"msg":"Hello #1","rect":[{"x":1,"y":2,"height":10,"width":10}]}

....

{"id":1000,"msg":"Hello #1000","rect":[{"x":1,"y":2,"height":10,"width":10}]}
//end 最后一行结束

web端代码:

//发起请求
export const POST_BASIC_STREAM = async (data:any):Promise<any>=>{
  const controller = new AbortController();
  const res = await fetch(API.BASIC_PREDICT, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json'},
      // 如果你是 Chrome ≥118,想要边传 body 边收响应,可以加 duplex:'half'
      body: JSON.stringify(data),
      signal: controller.signal

    });
  if (!res.ok) throw new Error(res.statusText);
  return [res, controller];
}
//此时res.body是一个可读流,完成字节到可读文本的解析即可

将原始的二进制流逐步解码、解析成结构化数据(JSON)。

import { EventSourceParserStream } from 'eventsource-parser/stream';
export async function* streamToIterator(responseBody: ReadableStream<Uint8Array>):AsyncGenerator{
  const eventStream = responseBody
    .pipeThrough(new TextDecoderStream())//将二进制流(Uint8Array)解码为字符串。
    .pipeThrough(new EventSourceParserStream())//解析 Server-Sent Events (SSE) 格式的数据。
    .getReader();//获取一个读取器,允许你逐块读取流中的内容。
    while (true) {
      const { value, done } = await eventStream.read();
      if (done) {
        yield { done: true, value: null}
        break;
      }
      try{
		const parsedData = JSON.parse(value.data)
		yield { done: false, value: parsedData };
      }catch (e) {
        console.error('yield* iterable => parse 生成器数据异常');
      }
    }
}

streamToIterator 是一个将 ReadableStream<Uint8Array> 转换为异步生成器(AsyncGenerator)的函数,用于逐步读取流中的数据,并将其解析后产出。

//调用
const [res,controller] = await POST_BASIC_STREAM(postdata);
if(res && res.ok && res.body){
	const dataStream = await streamToIterator(res.body);
	for await (const chunk of dataStream) {
		if(chunk.done){
			console.log("end:",chunk);
		}else{
			console.log('ing:',chunk);
		}
	}
}

为什么选 fetch 来做流式,而不是用基于 XMLHttpRequest (XHR) 的 axios?
这是因为:XHR 天生没有「stream」型。无法高效的实现可读流,这种类似的场景下,不是一个最优的选择

Server-Sent Events 和 chunk 有什么区别?
Chunk (HTTP chunked transfer coding) → 传输层技巧:把⼀个 HTTP 响应拆成 N 个“⻓度 + 数据”的⽚段,解决「返回前不知道总⻓度」的问题。浏览器收到后再拼回完整实体。

SSE (Server-Sent Events) → 应⽤层协议:在 text/event-stream 里约定 data:…\n\n 这类字段格式,再借助(常见的)chunked 把这些“事件”持续推给客户端;浏览器用 EventSource API 能解析、重连、按事件派发并且仅支持 get 请求方式这里的具体使用可以翻看我之前的文章。当然。大多数 SSE 响应底层就用 chunked,但一个纯 chunked 流并不自动成为 SSE 流;只有遵守 text/event-stream 格式、客户端用 EventSource 才算真正的 SSE。

维度 Chunked SSE
单向/双向 只是⼀个响应体怎么传,方向由 HTTP 决定 天然单向:Server → Client
重连机制 要⾃⼰写 JS、处理偏移量 (一般只读就行) 浏览器自动带上 Last-Event-ID: 头帮你续传
常见场景 ⻓视⻆录制、⼤⽂件下载、多数据实时输出、流式 AI 输出(写⾃定义协议)
——“实时生成”
股票⾏情推送、ChatGPT 式增量补全、实时日志、进度条
代理/缓存坑位 hop-by-hop 头,CDN/反向代理可能把所有 chunk 合并再发;压缩后浏览器最多要等 ~1 KiB 才触发 progress 事件。(stackoverflow.com) 同样依赖 chunk,但因每条事件通常几十字节 + 心跳,多数代理不会合并太久;再搭配自动重连容错性更好。

Generator 函数详解

Generator 是 JavaScript 中一种特殊的函数,用于控制函数的执行流程,并允许函数在执行过程中暂停和恢复。它通过 function* 语法定义,使用 yield 关键字来暂停函数的执行。

特点:
  • 可中断:通过 yield 暂停函数执行。
  • 可恢复:通过调用 .next() 方法继续执行。
  • 返回一个迭代器对象(Iterator):可以逐步获取值。
  • 支持异步操作(配合 async/await 可以实现异步流控制)。

基本结构

function* generatorFunction() {
  yield value1;
  yield value2;
  return finalValue;
}

示例代码(同步)

function* simpleGenerator() {
  console.log('Start');
  yield 1;
  yield 2;
  yield 3;
  console.log('End');
}

const gen = simpleGenerator();

console.log(gen.next()); // { value: 1, done: false }
console.log(gen.next()); // { value: 2, done: false }
console.log(gen.next()); // { value: 3, done: false }
console.log(gen.next()); // { value: undefined, done: true }

for  (const item of simpleGenerator()) {
    console.log(item); // 依次输出以上
  }

异步 Generator 示例(结合 async/await

async function* asyncNumberGenerator() {
  yield Promise.resolve(1);
  yield new Promise(resolve => setTimeout(resolve, 1000, 2));
  yield fetch('https://jsonplaceholder.typicode.com/todos/1').then(res => res.json());
}

(async () => {
  for await (const item of asyncNumberGenerator()) {
    console.log(item); // 依次输出 1, 2, { id: 1, ... }
  }
})();

就是一个典型的 async function*(异步生成器),用于将 ReadableStream<Uint8Array> 转换为一个逐块读取并产出解析后数据的异步迭代器。

它的工作流程如下:

  1. 接收一个流 responseBody
  2. 使用 TextDecoderStream 解码二进制数据为字符串。
  3. 使用 getReader() 获取流读取器。
  4. 通过 while (true) 循环不断调用 await eventStream.read()
  5. 如果数据完成 (done === true),则 yield { done: true }

总结

特性 描述
控制执行 通过 yield 暂停,通过 .next() 继续
返回类型 Generator 对象 / AsyncGenerator 对象
用途 实现惰性求值、流式处理、状态机等场景

更多推荐