Spring AI Alibaba 1.x 系列【6】ReactAgent 同步执行 & 流式执行
上一篇我们深度解析了 ReactAgent 构建器(Builder/DefaultBuilder)的源码逻辑,完成 Agent 实例化后,核心就是通过执行方法触发智能体的推理-行动-观察闭环。
文章目录
1. 前言
上一篇我们深度解析了 ReactAgent 构建器(Builder/DefaultBuilder)的源码逻辑,完成 Agent 实例化后,核心就是通过执行方法触发智能体的推理-行动-观察闭环。
本文将聚焦 ReactAgent 的两大核心执行方式,即:
- 同步执行:同步方法阻塞等待结果,适用于不需要实时反馈、追求结果完整性的场景
- 流式执行:通过
Flux实时推送结果,适用于需要实时展示结果的场景
2. 同步执行
同步执行是 ReactAgent 最常用的执行方式,核心特点是阻塞式执行,即调用方法后,需等待智能体完成全流程(推理、工具调用、结果生成),才能获取最终回复。
2.1 call()
ReactAgent 提供 8 个重载的 call() 方法,均返回 AssistantMessage(助手回复消息),所有重载方法最终均委托给 doMessageInvoke() 执行,覆盖从简单字符串输入到复杂多轮对话、自定义参数输入的全场景,适配不同业务需求。

2.1.1 源码解析
最简调用方式 - 字符串输入:
/**
* 最简调用方式 - 字符串输入
*
* 将字符串转换为 UserMessage 并执行 agent,返回 AI 的回复。
* 适用于简单的单次对话场景(如:单次提问、简单指令)。
*
* @param message 用户输入的字符串消息
* @return AI 生成的 AssistantMessage 回复
* @throws GraphRunnerException 如果图执行过程中发生错误
*/
public AssistantMessage call(String message) throws GraphRunnerException {
// 字符串消息,无配置 → 委托给核心执行方法
return doMessageInvoke(message, null);
}
字符串输入 + 配置控制:
/**
* 字符串输入 + 配置控制
*
* 允许通过 RunnableConfig 控制执行行为,如:
*
* threadId: 设置线程 ID,用于状态持久化(断点续跑)
* metadata: 传递额外的元数据(如用户ID、请求ID)
* timeout: 设置执行超时时间(避免阻塞过久)
*
* 适用于需要控制执行行为的单次对话场景。
*
* @param message 用户输入的字符串消息
* @param config 运行时配置,控制执行行为
* @return AI 生成的 AssistantMessage 回复
* @throws GraphRunnerException 如果图执行过程中发生错误
*/
public AssistantMessage call(String message, RunnableConfig config) throws GraphRunnerException {
// 字符串消息 + 自定义执行配置 → 委托核心执行方法
return doMessageInvoke(message, config);
}
UserMessage 输入:
/**
* UserMessage 输入 - 支持复杂消息构造
*
* UserMessage 可以包含:
*
* 文本内容
* 多媒体内容 (图片、文件等)
* 元数据和注释
*
* 适用于包含多媒体、复杂元数据的单次对话场景(如:图片识别提问)。
*
* @param message 构造好的 UserMessage 实例
* @return AI 生成的 AssistantMessage 回复
* @throws GraphRunnerException 如果图执行过程中发生错误
*/
public AssistantMessage call(UserMessage message) throws GraphRunnerException {
// 复杂消息对象,无配置 → 委托核心执行方法
return doMessageInvoke(message, null);
}
UserMessage 输入+ 配置控制:
/**
* UserMessage 输入 + 配置控制
*
* @param message 构造好的 UserMessage 实例
* @param config 运行时配置,控制执行行为
* @return AI 生成的 AssistantMessage 回复
* @throws GraphRunnerException 如果图执行过程中发生错误
*/
public AssistantMessage call(UserMessage message, RunnableConfig config) throws GraphRunnerException {
// 复杂消息对象 + 自定义执行配置 → 委托核心执行方法
return doMessageInvoke(message, config);
}
消息列表输入:
/**
* 消息列表输入 - 支持多轮对话
*
* 传入完整的消息历史,适用于:
*
* 多轮对话场景(如:连续提问、上下文关联对话)
* 需要保持上下文的对话(如:聊天机器人)
* 恢复之前的对话状态(结合 threadId 持久化)
*
* @param messages 消息历史列表(包含 UserMessage、AssistantMessage 交替)
* @return AI 生成的 AssistantMessage 回复
* @throws GraphRunnerException 如果图执行过程中发生错误
*/
public AssistantMessage call(List<Message> messages) throws GraphRunnerException {
// 多轮对话消息列表,无配置 → 委托核心执行方法
return doMessageInvoke(messages, null);
}
消息列表输入 + 配置控制:
/**
* 消息列表输入 + 配置控制
*
* @param messages 消息历史列表
* @param config 运行时配置,控制执行行为
* @return AI 生成的 AssistantMessage 回复
* @throws GraphRunnerException 如果图执行过程中发生错误
*/
public AssistantMessage call(List<Message> messages, RunnableConfig config) throws GraphRunnerException {
// 多轮对话消息列表 + 自定义执行配置 → 委托核心执行方法
return doMessageInvoke(messages, config);
}
自定义 Map 参数输入:
/**
* 自定义Map参数输入 - 支持扩展输入
*
* 可传入自定义键值对参数,除固定保留键外,其他键会作为图状态传递
* 保留关键字:messages、input,用于传递用户问题/输入
* 其他自定义键:可用于提示词占位符、自定义状态值(如:用户角色、业务参数)
* 适用于高级扩展场景(如:自定义提示词占位符替换、业务参数透传)。
*
* @param inputs 输入参数Map(保留关键字:messages、input)
* @return AI 生成的 AssistantMessage 回复
* @throws GraphRunnerException 图执行失败时抛出
*/
public AssistantMessage call(Map<String, Object> inputs) throws GraphRunnerException {
// 自定义Map输入,无配置 → 委托核心执行方法
return doMessageInvoke(inputs, null);
}
自定义 Map 参数+ 配置控制输入:
/**
* 自定义Map参数输入 + 配置控制
*
* 可传入自定义键值对参数,除固定保留键外,其他键会作为图状态传递
* 保留关键字:messages、input,用于传递用户问题/输入
*
* @param inputs 输入参数Map(保留关键字:messages、input)
* @param config 运行时配置,控制执行行为
* @return AI 生成的 AssistantMessage 回复
* @throws GraphRunnerException 图执行失败时抛出
*/
public AssistantMessage call(Map<String, Object> inputs, RunnableConfig config) throws GraphRunnerException {
// 自定义Map输入 + 执行配置 → 委托核心执行方法
return doMessageInvoke(inputs, config);
}
所有 call() 重载方法均委托给 doMessageInvoke() 执行,该方法负责统一处理消息格式、调用底层执行逻辑、提取最终回复。
提供两个重载,分别处理任意类型消息和自定义 Map 消息:
/**
* 通用对象类型消息执行入口
* 接收任意类型消息(String/UserMessage/List<Message>),统一构建为Map输入
* @param message 任意类型用户消息(字符串/单消息/消息列表)
* @param config 执行配置(可为null)
* @return 最终AI回复消息
* @throws GraphRunnerException 执行图异常
*/
private AssistantMessage doMessageInvoke(Object message, RunnableConfig config) throws GraphRunnerException {
// 1. 将任意类型消息统一构建为标准Map输入格式(适配底层执行图要求)
Map<String, Object> inputs = buildMessageInput(message);
// 2. 调用底层真正执行方法(doInvoke),执行Agent全流程(推理→工具调用→结果生成)
// 3. 从执行结果(全局状态)中提取标准的AssistantMessage返回
return extractAssistantMessage(doInvoke(inputs, config));
}
/**
* 自定义Map类型消息执行入口
* 直接接收已构建好的Map参数,跳过消息构建步骤,用于高级场景
* @param inputs 自定义输入Map(包含messages/input等状态)
* @param config 执行配置
* @return 最终AI回复消息
* @throws GraphRunnerException 执行图异常
*/
private AssistantMessage doMessageInvoke(Map<String, Object> inputs, RunnableConfig config) throws GraphRunnerException {
// 直接执行Agent流程,并提取结果返回(跳过消息格式转换,提升效率)
return extractAssistantMessage(doInvoke(inputs, config));
}
extractAssistantMessage() 方法负责从 Agent 执行后的全局状态(OverAllState)中提取 AssistantMessage,支持两种提取策略,适配不同配置场景,同时包含异常处理,保证回复提取的稳定性。
源码如下:
/**
* 从全局状态中提取助手消息
* 支持两种提取策略:
* 1. 若指定了outputKey,直接通过key获取对应助手消息(适用于自定义输出键场景)
* 2. 未指定key时,从messages状态列表中获取【最后一条】助手消息(默认策略)
* @param state 全局状态对象(Optional包装,允许为空)
* @return 提取到的AssistantMessage
* @throws IllegalStateException 指定outputKey但状态中无对应值时抛出
* @throws AgentException 未找到任何有效的AssistantMessage时抛出
*/
private AssistantMessage extractAssistantMessage(Optional<OverAllState> state) {
// 策略1:配置了outputKey,直接通过指定key从状态中提取消息
if (StringUtils.hasLength(outputKey)) {
return state.flatMap(s -> s.value(outputKey))
// 将状态值强转为AssistantMessage类型(确保类型安全)
.map(msg -> (AssistantMessage) msg)
// 未找到指定key的消息,抛出非法状态异常(快速定位配置问题)
.orElseThrow(() -> new IllegalStateException("Output key " + outputKey + " not found in agent state"));
}
// 策略2:未配置outputKey,从messages列表中提取最后一条AssistantMessage(符合多轮对话逻辑)
// 消息转换时增加校验逻辑,避免类型转换异常
return state.flatMap(s -> s.value("messages"))
.stream()
// 将状态中的messages对象强转为列表,并流式处理
.flatMap(messageList -> ((List<?>) messageList).stream()
// 过滤出所有AssistantMessage类型的消息(排除UserMessage等其他类型)
.filter(msg -> msg instanceof AssistantMessage)
.map(msg -> (AssistantMessage) msg))
// 取列表中【最后一条】助手消息(reduce((a,b)->b) 经典取最后一个元素用法)
.reduce((first, second) -> second)
// 未找到任何助手消息,抛出自定义异常(明确执行失败原因)
.orElseThrow(() -> new AgentException("No AssistantMessage found in 'messages' state"));
}
2.1.2 演示案例
案例 1 ,最简字符串输入(单次对话)
// 1. 假设已构建ReAct Agent实例
ReactAgent agent = new DefaultBuilder()
.name("simple-agent")
.model(chatModel) // 已配置ChatModel(如DeepSeek、OpenAI)
.systemPrompt("你是一个简单的问答助手,直接回答用户问题")
.build();
// 2. 最简调用:字符串输入
try {
AssistantMessage response = agent.call("Spring AI是什么?");
// 输出AI回复内容
System.out.println("AI回复:" + response.getContent());
} catch (GraphRunnerException e) {
// 处理执行异常(如模型调用失败、工具执行异常)
System.err.println("Agent执行失败:" + e.getMessage());
}
案例 2 ,字符串输入 + 执行配置(设置超时、线程ID):
// 1. 构建执行配置(设置超时时间、线程ID,用于状态持久化)
RunnableConfig config = RunnableConfig.builder()
.timeout(Duration.ofSeconds(30)) // 超时时间30秒
.threadId("user-123456") // 线程ID,用于关联对话状态
.metadata(Map.of("userId", "123", "requestId", "req-789")) // 传递元数据
.build();
// 2. 带配置的调用
try {
AssistantMessage response = agent.call("查询今天北京的天气", config);
System.out.println("AI回复:" + response.getContent());
} catch (GraphRunnerException e) {
System.err.println("Agent执行失败:" + e.getMessage());
}
案例 3 ,多轮对话(传入消息列表)
// 1. 构建消息历史列表(多轮对话上下文)
List<Message> messages = new ArrayList<>();
messages.add(new UserMessage("介绍下Spring AI的核心组件"));
messages.add(new AssistantMessage("Spring AI的核心组件包括Agent、ChatModel、Tool、Prompt等,其中Agent负责推理与工具调用。"));
messages.add(new UserMessage("详细说下Agent组件的作用"));
// 2. 调用call方法,传入消息列表(保持上下文)
try {
AssistantMessage response = agent.call(messages);
System.out.println("AI回复:" + response.getContent());
} catch (GraphRunnerException e) {
System.err.println("Agent执行失败:" + e.getMessage());
}
案例 4 ,自定义 Map 输入(扩展场景,透传业务参数)
// 1. 构建自定义输入Map(包含保留关键字messages,以及自定义业务参数)
Map<String, Object> inputs = new HashMap<>();
// 保留关键字:messages(传入用户消息)
inputs.put("messages", List.of(new UserMessage("根据用户角色推荐合适的功能")));
// 自定义业务参数:用户角色(用于提示词占位符替换)
inputs.put("userRole", "管理员");
// 2. 调用call方法,传入自定义Map
try {
AssistantMessage response = agent.call(inputs);
System.out.println("AI回复:" + response.getContent());
} catch (GraphRunnerException e) {
System.err.println("Agent执行失败:" + e.getMessage());
}
2.2 invoke()
除了 call() 系列方法,ReactAgent 还继承了抽象基类 Agent 的两个同步执行方法 invoke() 和 invokeAndGetOutput(),二者更偏向底层,返回全局状态或自定义输出,适用于高级场景。

2.2.1 核心源码解析
invoke() 同步执行返回完整执行状态(OverAllState ),支持多种不同类型的输入(后续会全面介绍):
/**
* 底层同步执行方法,返回全局状态(OverAllState)
* 适用于需要获取完整执行状态的场景(如:查看工具调用记录、对话历史、自定义状态)
* @param inputs 输入参数Map
* @param config 执行配置
* @return 执行后的全局状态(Optional包装)
* @throws GraphRunnerException 执行图异常
*/
@Override
public Optional<OverAllState> invoke(Map<String, Object> inputs, RunnableConfig config) throws GraphRunnerException {
// 调用底层执行图,返回完整全局状态(包含messages、工具调用记录、自定义状态等)
return doInvoke(inputs, config);
}
2.2.2 演示案例
示例 1 ,构建 ReactAgent 实例,配置自定义输出键:
// 1. 构建ReAct Agent实例(重点:配置outputKey,用于invokeAndGetOutput提取结果)
ReactAgent agent = new DefaultBuilder()
.name("invoke-demo-agent")
.model(chatModel) // 已配置ChatModel(如DeepSeek、OpenAI)
.systemPrompt("查询指定城市天气,仅返回天气结果(格式:城市+天气+温度,无需多余描述)")
.outputKey("weatherResult") // 配置自定义输出键,用于后续提取结果
.build();
// 2. 构建输入参数(查询上海明天的气温)
Map<String, Object> inputs = Map.of("input", "查询上海明天的气温");
示例 2 ,invoke 方法获取完整全局状态(适用于需要查看工具调用、对话历史等场景):
try {
Optional<OverAllState> state = agent.invoke(inputs, null);
if (state.isPresent()) {
OverAllState overAllState = state.get();
// 从状态中提取助手消息(手动提取,类似extractAssistantMessage逻辑)
AssistantMessage assistantMessage = overAllState.value("messages")
.stream()
.flatMap(list -> ((List<?>) list).stream())
.filter(msg -> msg instanceof AssistantMessage)
.map(msg -> (AssistantMessage) msg)
.reduce((a, b) -> b)
.orElseThrow(() -> new AgentException("未找到助手消息"));
// 提取工具调用记录(若有)
List<ToolCall> toolCalls = overAllState.value("toolCalls")
.map(calls -> (List<ToolCall>) calls)
.orElse(Collections.emptyList());
System.out.println("【invoke方法】AI回复:" + assistantMessage.getContent());
System.out.println("【invoke方法】工具调用记录:" + toolCalls);
}
} catch (GraphRunnerException | AgentException e) {
System.err.println("invoke方法执行失败:" + e.getMessage());
}
2.3 invokeAndGetOutput()
2.3.1 核心源码解析
invokeAndGetOutput() 同步执行返回节点输出(NodeOutput ),支持多种不同类型的输入(后续会全面介绍):
/**
* 同步执行 Agent 并返回节点输出(字符串输入)。
*
* @param message 用户输入的字符串消息
* @return 节点输出,可能为空
* @throws GraphRunnerException 图执行异常
*/
public Optional<NodeOutput> invokeAndGetOutput(String message) throws GraphRunnerException {
Map<String, Object> inputs = buildMessageInput(message);
return doInvokeAndGetOutput(inputs, null);
}
2.3.2 演示案例
示例 1 ,invokeAndGetOutput 直接提取自定义输出(适用于仅需结果、无需完整状态场景):
try {
// 调用invokeAndGetOutput,通过提前配置的outputKey提取结果
Optional<Object> output = agent.invokeAndGetOutput(inputs, null);
// 结果强转(需根据实际返回类型转换,此处为String)
output.ifPresent(result -> System.out.println("【invokeAndGetOutput方法】提取结果:" + (String) result));
// 处理无结果场景
if (output.isEmpty()) {
System.out.println("【invokeAndGetOutput方法】未提取到结果(可能未配置outputKey或执行失败)");
}
} catch (GraphRunnerException e) {
System.err.println("invokeAndGetOutput方法执行失败:" + e.getMessage());
}
3. 流式执行
流式执行的核心特点是非阻塞式执行,即调用方法后,不等待全流程完成,而是通过流式回调(Flux)实时返回执行结果(如 LLM 逐字生成的回复、工具调用的中间状态),适用于需要实时展示结果的场景(如:聊天机器人、实时问答)。
ReactAgent 的流式执行能力继承自 Agent 抽象基类,核心方法为 stream() 和 streamMessages(),二者均支持订阅式获取实时结果。

3.1 stream()
3.1.1 核心源码解析
stream 流式执行方法返回原始节点流( Flux<NodeOutput>),支持多种不同类型的输入(后续会全面介绍):
/**
* 【流式执行】使用纯文本字符串调用Agent,返回节点输出流
* <p>返回完整的节点执行信息,包含图执行的每个步骤详情</p>
*
* @param message 用户输入的文本消息
* @return 节点输出流,每个NodeOutput包含节点执行结果
* @throws GraphRunnerException 图执行过程中抛出的异常
*/
public Flux<NodeOutput> stream(String message) throws GraphRunnerException {
Map<String, Object> inputs = buildMessageInput(message);
return doStream(inputs, buildStreamConfig(null));
}
3.1.2 演示案例
案例1 ,基础流式执行(实时获取 AI 回复+工具调用中间消息)
// 1. 构建输入参数(查询天气,会触发工具调用)
Map<String, Object> inputs = Map.of("input", "查询广州今天的天气");
// 2. 调用stream方法,订阅流式结果
agent.stream(inputs, null)
.subscribe(
// 正常接收消息(实时推送)
message -> {
if (message instanceof AssistantMessage) {
// 接收LLM流式回复(逐字输出)
System.out.print("AI回复:" + ((AssistantMessage) message).getContent());
} else if (message instanceof ToolMessage) {
// 接收工具调用中间消息(如工具调用请求、返回结果)
ToolMessage toolMessage = (ToolMessage) message;
System.out.println("\n工具调用:" + toolMessage.getToolName() + ",结果:" + toolMessage.getContent());
}
},
// 异常处理
error -> System.err.println("流式执行异常:" + error.getMessage()),
// 执行完成回调
() -> System.out.println("\n流式执行完成")
);
3.2 streamMessages()
3.2.1 核心源码解析
streamMessages 流式执行方法返回消息流( Flux<Message>),支持多种不同类型的输入(后续会全面介绍):
/**
* 【流式执行】使用纯文本字符串调用Agent,返回消息流(便捷API)
* <p>基于 stream() 方法构建的便捷层,自动过滤掉中间节点输出,仅返回用户关心的消息</p>
* <p>适用于实时对话、前端流式展示等不需要关注图执行细节的场景</p>
*
* @param message 用户输入的文本消息
* @return 消息流,按生成顺序逐条发出
* @throws GraphRunnerException 图执行过程中抛出的异常
*/
public Flux<Message> streamMessages(String message) throws GraphRunnerException {
return stream(message)
.transform(this::extractMessages);
}
3.2.2 演示案例
案例1 , 简化流式执行(仅获取 AI 回复,忽略中间过程):
// 1. 构建多轮对话消息列表
List<Message> messages = new ArrayList<>();
messages.add(new UserMessage("什么是ReAct Agent?"));
messages.add(new AssistantMessage("ReAct Agent是一种基于ReAct循环的智能体,核心是推理-行动-观察。"));
messages.add(new UserMessage("详细说下ReAct循环的步骤"));
// 2. 调用streamMessages方法,仅订阅AssistantMessage
agent.streamMessages(messages, null)
.subscribe(
message -> {
// 仅接收AI流式回复,逐字打印
System.out.print(((AssistantMessage) message).getContent());
},
error -> System.err.println("流式执行异常:" + error.getMessage()),
() -> System.out.println("\n回复完成")
);
为武汉地区的开发者提供学习、交流和合作的平台。社区聚集了众多技术爱好者和专业人士,涵盖了多个领域,包括人工智能、大数据、云计算、区块链等。社区定期举办技术分享、培训和活动,为开发者提供更多的学习和交流机会。
更多推荐


所有评论(0)