一、概述

本文基于 OkHttp4.x + OkHttp-SSE 组件,实现跨服务 SSE 长连接流式数据推送,包含客户端发起 SSE 请求、服务端流式响应、前后端链路转发全流程,适配大模型问答、知识库流式输出、实时消息推送等业务场景。

二、技术栈

  1. 核心框架:SpringBoot
  2. HTTP 客户端:OkHttp 4.12.0
  3. SSE 组件:okhttp-sse
  4. 流式响应:Spring 内置 SseEmitter
  5. 序列化:FastJSON

三、项目依赖引入

		<!-- OkHttp 核心HTTP客户端 -->
        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>4.12.0</version>
        </dependency>

        <!-- SSE 长连接(服务器推送事件)支持 -->
        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp-sse</artifactId>
            <version>4.12.0</version>
        </dependency>

四、全局 OkHttp 客户端配置

通过 Spring 配置类初始化单例 OkHttpClient,统一管理连接超时、重连、重定向等全局参数,交由 Spring 容器托管复用。

@Configuration
public class OkHttpConfig {

    @Value("${okhttp.connect-timeout:10}")
    private Integer connectTimeout;

    @Value("${okhttp.read-timeout:0}")
    private Integer readTimeout;

    /**
     * 全局 OkHttpClient 单例 Bean
     * Spring 容器中唯一,全局复用
     */
    @Bean
    public OkHttpClient okHttpClient() {
        Dispatcher dispatcher = new Dispatcher();
        return new OkHttpClient.Builder()
                .connectTimeout(connectTimeout, TimeUnit.SECONDS)
                .readTimeout(readTimeout, TimeUnit.SECONDS)
                .followRedirects(true)
                .dispatcher(dispatcher)
                .retryOnConnectionFailure(true)
                //.hostnameVerifier(HttpsUtil.getHostnameVerifier())
                //.sslSocketFactory(HttpsUtil.getSSLSocketFactory(), HttpsUtil.getX509TrustManager())
                .build();
    }
}

配置说明

  • 连接超时:默认 10 秒
  • 读取超时:0 代表长连接无超时限制
  • 开启自动重定向、连接失败重试

五、SSE 客户端通用工具类

封装 OkHttp-SSE 监听逻辑,接收下游服务流式数据,转发至前端 SseEmitter,统一处理连接成功、消息接收、异常断开、正常关闭事件。

**
 * SSE 长连接客户端(接收服务端实时推送)
 */
@Slf4j
@Component
public class SseEmitterUtil {

    @Resource
    private OkHttpClient client;

    /**
     * 建立SSE长连接,转发流式数据至前端
     * @param request OkHttp请求实体
     * @return 前端流式发射器
     */
    public SseEmitter sseInvoke(Request request) {
        // 1. 创建SseEmitter(永不超时)
        SseEmitter emitter = new SseEmitter(0L);

        // 2. 创建 SSE 客户端,监听远程消息
        EventSource.Factory factory = EventSources.createFactory(client);
        factory.newEventSource(request, new EventSourceListener() {

            // 连接成功
            @Override
            public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) {
                System.out.println("SSE 连接成功");
            }

            // 收到服务端推送消息
            @Override
            public void onEvent(@NotNull EventSource eventSource,
                                @Nullable String id,
                                @Nullable String type,
                                @NotNull String data) {
                System.out.println("收到推送:id=" + id + ", 内容=" + data);
                try {
                    emitter.send(data);
                } catch (IOException e) {
                    // 发送失败,关闭连接
                    //emitter.completeWithError(e);
                    //eventSource.cancel();
                    log.error("调用SseEmitter#send方法发生异常", e);
                    throw new RuntimeException(e.getMessage());
                }
            }

            // 连接失败/断开
            @Override
            public void onFailure(@NotNull EventSource eventSource,
                                  @Nullable Throwable t,
                                  @Nullable Response response) {
                System.err.println("SSE 连接失败:" + (t != null ? t.getMessage() : ""));
                // 可选:自动重连
                // connect(sseUrl);
                Map<String, String> errorEntity = new HashMap<>();
                errorEntity.put("code", String.valueOf(response.code()));
                errorEntity.put("message", response.message());
                try {
                    emitter.send(JSON.toJSONString(errorEntity));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
                emitter.complete();
                eventSource.cancel();
            }

            // 正常关闭
            @Override
            public void onClosed(@NotNull EventSource eventSource) {
                System.out.println("SSE 连接已关闭");
                emitter.complete();
            }
        });
        return emitter;
    }
}

六、上游调用业务层

组装请求头、请求体,构建 OkHttp 请求对象,调用 SSE 工具类发起跨服务流式调用。

@Service
public class KnowledgeService  {

    private final String baseUrl = "http://localhost:8084";
    private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8");

    @Resource
    private SseEmitterUtil sseEmitterUtil;

    public SseEmitter sseKnowledge(KnowledgeBody po) {
        RequestBody requestBody = RequestBody.create(MEDIA_TYPE, JSON.toJSONString(po));
        Headers headers = new Headers.Builder()
                .add("X-Bce-Request-ID", UUID.randomUUID().toString())
                .add("Access-Key", "accessKey111")
                .add("Token", "token111")
                .build();
        Request request = new Request.Builder()
                .url(baseUrl + "/knowledge/streamKnowledge")  // 参数放 URL
                //.header("Accept", "text/event-stream")  // ✅ 必须指定接收 SSE 格式
                //.header("Cache-Control", "no-cache")    // ✅ 禁用缓存
                .headers(headers)
                .post(requestBody)
                .build();
        return sseEmitterUtil.sseInvoke(request);
    }
}

七、上游对外接口控制器

接收前端 POST 请求,调用业务层返回 SSE 流式响应。

@RestController
@RequestMapping("/knowledge")
public class KnowledgeController {

    @Resource
    private KnowledgeService knowledgeService;

    @PostMapping("/sseKnowledge")
    public SseEmitter sseKnowledge(@RequestBody KnowledgeBody po) {
      return knowledgeService.sseKnowledge(po);
    }
}

八、下游 SSE 服务端接口

接收上游请求,异步分批推送流式数据,模拟知识库分段返回效果,异常自动关闭连接。

/**
 * 被调用方接口
 * URL、Header、参数、返回格式 100% 匹配
 */
@RestController
@RequestMapping("/knowledge")
public class KnowledgeController {

    /**
     * SSE 流式接口 - 被调用方
     */
    @PostMapping("/streamKnowledge")
    public SseEmitter streamKnowledge(@RequestBody AIStudioKnowledgesBody body,
                            //@RequestHeader(value = "Accept", required = false) String accept,
                            @RequestHeader(value = "X-Bce-Request-ID", required = false) String requestId,
                            @RequestHeader(value = "Access-Key", required = false) String accessKey,
                            @RequestHeader(value = "Token", required = false) String token) {
        // 创建 SSE 发射器,设置超时时间 30 秒
        SseEmitter emitter = new SseEmitter(30000L);

        // 异步处理业务逻辑
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(() -> {
            try {

                // 模拟流式输出数据
                String[] messages = {
                        "正在查询知识库...",
                        "找到 3 条相关记录",
                        "第一条:Spring Boot 配置详解",
                        "第二条:OkHttp 使用指南",
                        "第三条:SSE 服务端实现",
                        "查询完成!"
                };

                for (String msg : messages) {
                    Thread.sleep(500); // 模拟处理延迟
                    emitter.send(SseEmitter.event()
                            .id(UUID.randomUUID().toString())
                            .data(msg));
                }

                // 发送完成事件
                emitter.send(SseEmitter.event()
                        .id("done")
                        .data("[DONE]"));
                emitter.complete();

            } catch (Exception e) {
                System.err.println("SSE 处理异常: " + e.getMessage());
                emitter.completeWithError(e);
            } finally {
                executor.shutdown();
            }
        });
        return emitter;
    }
}

更多推荐