SpringBoot 基于 OkHttp-SSE 实现服务端流式推送
·
一、概述
本文基于 OkHttp4.x + OkHttp-SSE 组件,实现跨服务 SSE 长连接流式数据推送,包含客户端发起 SSE 请求、服务端流式响应、前后端链路转发全流程,适配大模型问答、知识库流式输出、实时消息推送等业务场景。
二、技术栈
- 核心框架:SpringBoot
- HTTP 客户端:OkHttp 4.12.0
- SSE 组件:okhttp-sse
- 流式响应:Spring 内置 SseEmitter
- 序列化:FastJSON
三、项目依赖引入
<!-- OkHttp 核心HTTP客户端 -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.12.0</version>
</dependency>
<!-- SSE 长连接(服务器推送事件)支持 -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp-sse</artifactId>
<version>4.12.0</version>
</dependency>
四、全局 OkHttp 客户端配置
通过 Spring 配置类初始化单例 OkHttpClient,统一管理连接超时、重连、重定向等全局参数,交由 Spring 容器托管复用。
@Configuration
public class OkHttpConfig {
@Value("${okhttp.connect-timeout:10}")
private Integer connectTimeout;
@Value("${okhttp.read-timeout:0}")
private Integer readTimeout;
/**
* 全局 OkHttpClient 单例 Bean
* Spring 容器中唯一,全局复用
*/
@Bean
public OkHttpClient okHttpClient() {
Dispatcher dispatcher = new Dispatcher();
return new OkHttpClient.Builder()
.connectTimeout(connectTimeout, TimeUnit.SECONDS)
.readTimeout(readTimeout, TimeUnit.SECONDS)
.followRedirects(true)
.dispatcher(dispatcher)
.retryOnConnectionFailure(true)
//.hostnameVerifier(HttpsUtil.getHostnameVerifier())
//.sslSocketFactory(HttpsUtil.getSSLSocketFactory(), HttpsUtil.getX509TrustManager())
.build();
}
}
配置说明
- 连接超时:默认 10 秒
- 读取超时:0 代表长连接无超时限制
- 开启自动重定向、连接失败重试
五、SSE 客户端通用工具类
封装 OkHttp-SSE 监听逻辑,接收下游服务流式数据,转发至前端 SseEmitter,统一处理连接成功、消息接收、异常断开、正常关闭事件。
**
* SSE 长连接客户端(接收服务端实时推送)
*/
@Slf4j
@Component
public class SseEmitterUtil {
@Resource
private OkHttpClient client;
/**
* 建立SSE长连接,转发流式数据至前端
* @param request OkHttp请求实体
* @return 前端流式发射器
*/
public SseEmitter sseInvoke(Request request) {
// 1. 创建SseEmitter(永不超时)
SseEmitter emitter = new SseEmitter(0L);
// 2. 创建 SSE 客户端,监听远程消息
EventSource.Factory factory = EventSources.createFactory(client);
factory.newEventSource(request, new EventSourceListener() {
// 连接成功
@Override
public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) {
System.out.println("SSE 连接成功");
}
// 收到服务端推送消息
@Override
public void onEvent(@NotNull EventSource eventSource,
@Nullable String id,
@Nullable String type,
@NotNull String data) {
System.out.println("收到推送:id=" + id + ", 内容=" + data);
try {
emitter.send(data);
} catch (IOException e) {
// 发送失败,关闭连接
//emitter.completeWithError(e);
//eventSource.cancel();
log.error("调用SseEmitter#send方法发生异常", e);
throw new RuntimeException(e.getMessage());
}
}
// 连接失败/断开
@Override
public void onFailure(@NotNull EventSource eventSource,
@Nullable Throwable t,
@Nullable Response response) {
System.err.println("SSE 连接失败:" + (t != null ? t.getMessage() : ""));
// 可选:自动重连
// connect(sseUrl);
Map<String, String> errorEntity = new HashMap<>();
errorEntity.put("code", String.valueOf(response.code()));
errorEntity.put("message", response.message());
try {
emitter.send(JSON.toJSONString(errorEntity));
} catch (IOException e) {
throw new RuntimeException(e);
}
emitter.complete();
eventSource.cancel();
}
// 正常关闭
@Override
public void onClosed(@NotNull EventSource eventSource) {
System.out.println("SSE 连接已关闭");
emitter.complete();
}
});
return emitter;
}
}
六、上游调用业务层
组装请求头、请求体,构建 OkHttp 请求对象,调用 SSE 工具类发起跨服务流式调用。
@Service
public class KnowledgeService {
private final String baseUrl = "http://localhost:8084";
private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8");
@Resource
private SseEmitterUtil sseEmitterUtil;
public SseEmitter sseKnowledge(KnowledgeBody po) {
RequestBody requestBody = RequestBody.create(MEDIA_TYPE, JSON.toJSONString(po));
Headers headers = new Headers.Builder()
.add("X-Bce-Request-ID", UUID.randomUUID().toString())
.add("Access-Key", "accessKey111")
.add("Token", "token111")
.build();
Request request = new Request.Builder()
.url(baseUrl + "/knowledge/streamKnowledge") // 参数放 URL
//.header("Accept", "text/event-stream") // ✅ 必须指定接收 SSE 格式
//.header("Cache-Control", "no-cache") // ✅ 禁用缓存
.headers(headers)
.post(requestBody)
.build();
return sseEmitterUtil.sseInvoke(request);
}
}
七、上游对外接口控制器
接收前端 POST 请求,调用业务层返回 SSE 流式响应。
@RestController
@RequestMapping("/knowledge")
public class KnowledgeController {
@Resource
private KnowledgeService knowledgeService;
@PostMapping("/sseKnowledge")
public SseEmitter sseKnowledge(@RequestBody KnowledgeBody po) {
return knowledgeService.sseKnowledge(po);
}
}
八、下游 SSE 服务端接口
接收上游请求,异步分批推送流式数据,模拟知识库分段返回效果,异常自动关闭连接。
/**
* 被调用方接口
* URL、Header、参数、返回格式 100% 匹配
*/
@RestController
@RequestMapping("/knowledge")
public class KnowledgeController {
/**
* SSE 流式接口 - 被调用方
*/
@PostMapping("/streamKnowledge")
public SseEmitter streamKnowledge(@RequestBody AIStudioKnowledgesBody body,
//@RequestHeader(value = "Accept", required = false) String accept,
@RequestHeader(value = "X-Bce-Request-ID", required = false) String requestId,
@RequestHeader(value = "Access-Key", required = false) String accessKey,
@RequestHeader(value = "Token", required = false) String token) {
// 创建 SSE 发射器,设置超时时间 30 秒
SseEmitter emitter = new SseEmitter(30000L);
// 异步处理业务逻辑
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
try {
// 模拟流式输出数据
String[] messages = {
"正在查询知识库...",
"找到 3 条相关记录",
"第一条:Spring Boot 配置详解",
"第二条:OkHttp 使用指南",
"第三条:SSE 服务端实现",
"查询完成!"
};
for (String msg : messages) {
Thread.sleep(500); // 模拟处理延迟
emitter.send(SseEmitter.event()
.id(UUID.randomUUID().toString())
.data(msg));
}
// 发送完成事件
emitter.send(SseEmitter.event()
.id("done")
.data("[DONE]"));
emitter.complete();
} catch (Exception e) {
System.err.println("SSE 处理异常: " + e.getMessage());
emitter.completeWithError(e);
} finally {
executor.shutdown();
}
});
return emitter;
}
}
更多推荐

所有评论(0)