1. 前言

上一篇我们深度解析了 ReactAgent 构建器(Builder/DefaultBuilder)的源码逻辑,完成 Agent 实例化后,核心就是通过执行方法触发智能体的推理-行动-观察闭环

本文将聚焦 ReactAgent 的两大核心执行方式,即:

  • 同步执行:同步方法阻塞等待结果,适用于不需要实时反馈、追求结果完整性的场景
  • 流式执行:通过 Flux 实时推送结果,适用于需要实时展示结果的场景

2. 同步执行

同步执行是 ReactAgent 最常用的执行方式,核心特点是阻塞式执行,即调用方法后,需等待智能体完成全流程(推理、工具调用、结果生成),才能获取最终回复。

2.1 call()

ReactAgent 提供 8 个重载的 call() 方法,均返回 AssistantMessage(助手回复消息),所有重载方法最终均委托给 doMessageInvoke() 执行,覆盖从简单字符串输入到复杂多轮对话、自定义参数输入的全场景,适配不同业务需求。

在这里插入图片描述

2.1.1 源码解析

最简调用方式 - 字符串输入:

/**
 * 最简调用方式 - 字符串输入
 *
 * 将字符串转换为 UserMessage 并执行 agent,返回 AI 的回复。
 * 适用于简单的单次对话场景(如:单次提问、简单指令)。
 *
 * @param message 用户输入的字符串消息
 * @return AI 生成的 AssistantMessage 回复
 * @throws GraphRunnerException 如果图执行过程中发生错误
 */
public AssistantMessage call(String message) throws GraphRunnerException {
    // 字符串消息,无配置 → 委托给核心执行方法
    return doMessageInvoke(message, null);
}

字符串输入 + 配置控制:

/**
 * 字符串输入 + 配置控制
 *
 * 允许通过 RunnableConfig 控制执行行为,如:
 * 
 *   threadId: 设置线程 ID,用于状态持久化(断点续跑)
 *   metadata: 传递额外的元数据(如用户ID、请求ID)
 *   timeout: 设置执行超时时间(避免阻塞过久)
 * 
 * 适用于需要控制执行行为的单次对话场景。
 *
 * @param message 用户输入的字符串消息
 * @param config 运行时配置,控制执行行为
 * @return AI 生成的 AssistantMessage 回复
 * @throws GraphRunnerException 如果图执行过程中发生错误
 */
public AssistantMessage call(String message, RunnableConfig config) throws GraphRunnerException {
    // 字符串消息 + 自定义执行配置 → 委托核心执行方法
    return doMessageInvoke(message, config);
}

UserMessage 输入:

/**
 * UserMessage 输入 - 支持复杂消息构造
 *
 * UserMessage 可以包含:
 * 
 *   文本内容
 *   多媒体内容 (图片、文件等)
 *   元数据和注释
 * 
 * 适用于包含多媒体、复杂元数据的单次对话场景(如:图片识别提问)。
 *
 * @param message 构造好的 UserMessage 实例
 * @return AI 生成的 AssistantMessage 回复
 * @throws GraphRunnerException 如果图执行过程中发生错误
 */
public AssistantMessage call(UserMessage message) throws GraphRunnerException {
    // 复杂消息对象,无配置 → 委托核心执行方法
    return doMessageInvoke(message, null);
}

UserMessage 输入+ 配置控制:

/**
 * UserMessage 输入 + 配置控制
 *
 * @param message 构造好的 UserMessage 实例
 * @param config 运行时配置,控制执行行为
 * @return AI 生成的 AssistantMessage 回复
 * @throws GraphRunnerException 如果图执行过程中发生错误
 */
public AssistantMessage call(UserMessage message, RunnableConfig config) throws GraphRunnerException {
    // 复杂消息对象 + 自定义执行配置 → 委托核心执行方法
    return doMessageInvoke(message, config);
}

消息列表输入:

/**
 * 消息列表输入 - 支持多轮对话
 *
 * 传入完整的消息历史,适用于:
 * 
 *   多轮对话场景(如:连续提问、上下文关联对话)
 *   需要保持上下文的对话(如:聊天机器人)
 *   恢复之前的对话状态(结合 threadId 持久化)
 * 

 * @param messages 消息历史列表(包含 UserMessage、AssistantMessage 交替)
 * @return AI 生成的 AssistantMessage 回复
 * @throws GraphRunnerException 如果图执行过程中发生错误
 */
public AssistantMessage call(List<Message> messages) throws GraphRunnerException {
    // 多轮对话消息列表,无配置 → 委托核心执行方法
    return doMessageInvoke(messages, null);
}

消息列表输入 + 配置控制:

/**
 * 消息列表输入 + 配置控制
 *
 * @param messages 消息历史列表
 * @param config 运行时配置,控制执行行为
 * @return AI 生成的 AssistantMessage 回复
 * @throws GraphRunnerException 如果图执行过程中发生错误
 */
public AssistantMessage call(List<Message> messages, RunnableConfig config) throws GraphRunnerException {
    // 多轮对话消息列表 + 自定义执行配置 → 委托核心执行方法
    return doMessageInvoke(messages, config);
}

自定义 Map 参数输入:

/**
 * 自定义Map参数输入 - 支持扩展输入
 *
 * 可传入自定义键值对参数,除固定保留键外,其他键会作为图状态传递
 * 保留关键字:messages、input,用于传递用户问题/输入
 * 其他自定义键:可用于提示词占位符、自定义状态值(如:用户角色、业务参数)
 * 适用于高级扩展场景(如:自定义提示词占位符替换、业务参数透传)。
 *
 * @param inputs 输入参数Map(保留关键字:messages、input)
 * @return AI 生成的 AssistantMessage 回复
 * @throws GraphRunnerException 图执行失败时抛出
 */
public AssistantMessage call(Map<String, Object> inputs) throws GraphRunnerException {
    // 自定义Map输入,无配置 → 委托核心执行方法
    return doMessageInvoke(inputs, null);
}

自定义 Map 参数+ 配置控制输入:

/**
 * 自定义Map参数输入 + 配置控制
 *
 * 可传入自定义键值对参数,除固定保留键外,其他键会作为图状态传递
 * 保留关键字:messages、input,用于传递用户问题/输入
 *
 * @param inputs 输入参数Map(保留关键字:messages、input)
 * @param config 运行时配置,控制执行行为
 * @return AI 生成的 AssistantMessage 回复
 * @throws GraphRunnerException 图执行失败时抛出
 */
public AssistantMessage call(Map<String, Object> inputs, RunnableConfig config) throws GraphRunnerException {
    // 自定义Map输入 + 执行配置 → 委托核心执行方法
    return doMessageInvoke(inputs, config);
}

所有 call() 重载方法均委托给 doMessageInvoke() 执行,该方法负责统一处理消息格式、调用底层执行逻辑、提取最终回复。

提供两个重载,分别处理任意类型消息和自定义 Map 消息:

/**
 * 通用对象类型消息执行入口
 * 接收任意类型消息(String/UserMessage/List<Message>),统一构建为Map输入
 * @param message 任意类型用户消息(字符串/单消息/消息列表)
 * @param config 执行配置(可为null)
 * @return 最终AI回复消息
 * @throws GraphRunnerException 执行图异常
 */
private AssistantMessage doMessageInvoke(Object message, RunnableConfig config) throws GraphRunnerException {
    // 1. 将任意类型消息统一构建为标准Map输入格式(适配底层执行图要求)
    Map<String, Object> inputs = buildMessageInput(message);
    // 2. 调用底层真正执行方法(doInvoke),执行Agent全流程(推理→工具调用→结果生成)
    // 3. 从执行结果(全局状态)中提取标准的AssistantMessage返回
    return extractAssistantMessage(doInvoke(inputs, config));
}

/**
 * 自定义Map类型消息执行入口
 * 直接接收已构建好的Map参数,跳过消息构建步骤,用于高级场景
 * @param inputs 自定义输入Map(包含messages/input等状态)
 * @param config 执行配置
 * @return 最终AI回复消息
 * @throws GraphRunnerException 执行图异常
 */
private AssistantMessage doMessageInvoke(Map<String, Object> inputs, RunnableConfig config) throws GraphRunnerException {
    // 直接执行Agent流程,并提取结果返回(跳过消息格式转换,提升效率)
    return extractAssistantMessage(doInvoke(inputs, config));
}

extractAssistantMessage() 方法负责从 Agent 执行后的全局状态(OverAllState)中提取 AssistantMessage,支持两种提取策略,适配不同配置场景,同时包含异常处理,保证回复提取的稳定性。

源码如下:

/**
 * 从全局状态中提取助手消息
 * 支持两种提取策略:
 * 1. 若指定了outputKey,直接通过key获取对应助手消息(适用于自定义输出键场景)
 * 2. 未指定key时,从messages状态列表中获取【最后一条】助手消息(默认策略)
 * @param state 全局状态对象(Optional包装,允许为空)
 * @return 提取到的AssistantMessage
 * @throws IllegalStateException 指定outputKey但状态中无对应值时抛出
 * @throws AgentException 未找到任何有效的AssistantMessage时抛出
 */
private AssistantMessage extractAssistantMessage(Optional<OverAllState> state) {
    // 策略1:配置了outputKey,直接通过指定key从状态中提取消息
    if (StringUtils.hasLength(outputKey)) {
        return state.flatMap(s -> s.value(outputKey))
                    // 将状态值强转为AssistantMessage类型(确保类型安全)
                    .map(msg -> (AssistantMessage) msg)
                    // 未找到指定key的消息,抛出非法状态异常(快速定位配置问题)
                    .orElseThrow(() -> new IllegalStateException("Output key " + outputKey + " not found in agent state"));
    }

    // 策略2:未配置outputKey,从messages列表中提取最后一条AssistantMessage(符合多轮对话逻辑)
    // 消息转换时增加校验逻辑,避免类型转换异常
    return state.flatMap(s -> s.value("messages"))
                .stream()
                // 将状态中的messages对象强转为列表,并流式处理
                .flatMap(messageList -> ((List<?>) messageList).stream()
                        // 过滤出所有AssistantMessage类型的消息(排除UserMessage等其他类型)
                        .filter(msg -> msg instanceof AssistantMessage)
                        .map(msg -> (AssistantMessage) msg))
                // 取列表中【最后一条】助手消息(reduce((a,b)->b) 经典取最后一个元素用法)
                .reduce((first, second) -> second)
                // 未找到任何助手消息,抛出自定义异常(明确执行失败原因)
                .orElseThrow(() -> new AgentException("No AssistantMessage found in 'messages' state"));
}

2.1.2 演示案例

案例 1 ,最简字符串输入(单次对话)

// 1. 假设已构建ReAct Agent实例
ReactAgent agent = new DefaultBuilder()
        .name("simple-agent")
        .model(chatModel) // 已配置ChatModel(如DeepSeek、OpenAI)
        .systemPrompt("你是一个简单的问答助手,直接回答用户问题")
        .build();

// 2. 最简调用:字符串输入
try {
    AssistantMessage response = agent.call("Spring AI是什么?");
    // 输出AI回复内容
    System.out.println("AI回复:" + response.getContent());
} catch (GraphRunnerException e) {
    // 处理执行异常(如模型调用失败、工具执行异常)
    System.err.println("Agent执行失败:" + e.getMessage());
}

案例 2 ,字符串输入 + 执行配置(设置超时、线程ID):

// 1. 构建执行配置(设置超时时间、线程ID,用于状态持久化)
RunnableConfig config = RunnableConfig.builder()
        .timeout(Duration.ofSeconds(30)) // 超时时间30秒
        .threadId("user-123456") // 线程ID,用于关联对话状态
        .metadata(Map.of("userId", "123", "requestId", "req-789")) // 传递元数据
        .build();

// 2. 带配置的调用
try {
    AssistantMessage response = agent.call("查询今天北京的天气", config);
    System.out.println("AI回复:" + response.getContent());
} catch (GraphRunnerException e) {
    System.err.println("Agent执行失败:" + e.getMessage());
}

案例 3 ,多轮对话(传入消息列表)

// 1. 构建消息历史列表(多轮对话上下文)
List<Message> messages = new ArrayList<>();
messages.add(new UserMessage("介绍下Spring AI的核心组件"));
messages.add(new AssistantMessage("Spring AI的核心组件包括Agent、ChatModel、Tool、Prompt等,其中Agent负责推理与工具调用。"));
messages.add(new UserMessage("详细说下Agent组件的作用"));

// 2. 调用call方法,传入消息列表(保持上下文)
try {
    AssistantMessage response = agent.call(messages);
    System.out.println("AI回复:" + response.getContent());
} catch (GraphRunnerException e) {
    System.err.println("Agent执行失败:" + e.getMessage());
}

案例 4 ,自定义 Map 输入(扩展场景,透传业务参数)

// 1. 构建自定义输入Map(包含保留关键字messages,以及自定义业务参数)
Map<String, Object> inputs = new HashMap<>();
// 保留关键字:messages(传入用户消息)
inputs.put("messages", List.of(new UserMessage("根据用户角色推荐合适的功能")));
// 自定义业务参数:用户角色(用于提示词占位符替换)
inputs.put("userRole", "管理员");

// 2. 调用call方法,传入自定义Map
try {
    AssistantMessage response = agent.call(inputs);
    System.out.println("AI回复:" + response.getContent());
} catch (GraphRunnerException e) {
    System.err.println("Agent执行失败:" + e.getMessage());
}

2.2 invoke()

除了 call() 系列方法,ReactAgent 还继承了抽象基类 Agent 的两个同步执行方法 invoke()invokeAndGetOutput(),二者更偏向底层,返回全局状态或自定义输出,适用于高级场景。

在这里插入图片描述

2.2.1 核心源码解析

invoke() 同步执行返回完整执行状态(OverAllState ),支持多种不同类型的输入(后续会全面介绍):

/**
 * 底层同步执行方法,返回全局状态(OverAllState)
 * 适用于需要获取完整执行状态的场景(如:查看工具调用记录、对话历史、自定义状态)
 * @param inputs 输入参数Map
 * @param config 执行配置
 * @return 执行后的全局状态(Optional包装)
 * @throws GraphRunnerException 执行图异常
 */
@Override
public Optional<OverAllState> invoke(Map<String, Object> inputs, RunnableConfig config) throws GraphRunnerException {
    // 调用底层执行图,返回完整全局状态(包含messages、工具调用记录、自定义状态等)
    return doInvoke(inputs, config);
}

2.2.2 演示案例

示例 1 ,构建 ReactAgent 实例,配置自定义输出键:

// 1. 构建ReAct Agent实例(重点:配置outputKey,用于invokeAndGetOutput提取结果)
ReactAgent agent = new DefaultBuilder()
        .name("invoke-demo-agent")
        .model(chatModel) // 已配置ChatModel(如DeepSeek、OpenAI)
        .systemPrompt("查询指定城市天气,仅返回天气结果(格式:城市+天气+温度,无需多余描述)")
        .outputKey("weatherResult") // 配置自定义输出键,用于后续提取结果
        .build();
// 2. 构建输入参数(查询上海明天的气温)
Map<String, Object> inputs = Map.of("input", "查询上海明天的气温");

示例 2invoke 方法获取完整全局状态(适用于需要查看工具调用、对话历史等场景):

try {
    Optional<OverAllState> state = agent.invoke(inputs, null);
    if (state.isPresent()) {
        OverAllState overAllState = state.get();
        // 从状态中提取助手消息(手动提取,类似extractAssistantMessage逻辑)
        AssistantMessage assistantMessage = overAllState.value("messages")
                .stream()
                .flatMap(list -> ((List<?>) list).stream())
                .filter(msg -> msg instanceof AssistantMessage)
                .map(msg -> (AssistantMessage) msg)
                .reduce((a, b) -> b)
                .orElseThrow(() -> new AgentException("未找到助手消息"));
        
        // 提取工具调用记录(若有)
        List<ToolCall> toolCalls = overAllState.value("toolCalls")
                .map(calls -> (List<ToolCall>) calls)
                .orElse(Collections.emptyList());
        
        System.out.println("【invoke方法】AI回复:" + assistantMessage.getContent());
        System.out.println("【invoke方法】工具调用记录:" + toolCalls);
    }
} catch (GraphRunnerException | AgentException e) {
    System.err.println("invoke方法执行失败:" + e.getMessage());
}

2.3 invokeAndGetOutput()

2.3.1 核心源码解析

invokeAndGetOutput() 同步执行返回节点输出(NodeOutput ),支持多种不同类型的输入(后续会全面介绍):

	/**
	 * 同步执行 Agent 并返回节点输出(字符串输入)。
	 *
	 * @param message 用户输入的字符串消息
	 * @return 节点输出,可能为空
	 * @throws GraphRunnerException 图执行异常
	 */
	public Optional<NodeOutput> invokeAndGetOutput(String message) throws GraphRunnerException {
		Map<String, Object> inputs = buildMessageInput(message);
		return doInvokeAndGetOutput(inputs, null);
	}

2.3.2 演示案例

示例 1invokeAndGetOutput 直接提取自定义输出(适用于仅需结果、无需完整状态场景):

try {
    // 调用invokeAndGetOutput,通过提前配置的outputKey提取结果
    Optional<Object> output = agent.invokeAndGetOutput(inputs, null);
    // 结果强转(需根据实际返回类型转换,此处为String)
    output.ifPresent(result -> System.out.println("【invokeAndGetOutput方法】提取结果:" + (String) result));
    // 处理无结果场景
    if (output.isEmpty()) {
        System.out.println("【invokeAndGetOutput方法】未提取到结果(可能未配置outputKey或执行失败)");
    }
} catch (GraphRunnerException e) {
    System.err.println("invokeAndGetOutput方法执行失败:" + e.getMessage());
}

3. 流式执行

流式执行的核心特点是非阻塞式执行,即调用方法后,不等待全流程完成,而是通过流式回调(Flux)实时返回执行结果(如 LLM 逐字生成的回复、工具调用的中间状态),适用于需要实时展示结果的场景(如:聊天机器人、实时问答)。

ReactAgent 的流式执行能力继承自 Agent 抽象基类,核心方法为 stream()streamMessages(),二者均支持订阅式获取实时结果。

在这里插入图片描述

3.1 stream()

3.1.1 核心源码解析

stream 流式执行方法返回原始节点流Flux<NodeOutput>),支持多种不同类型的输入(后续会全面介绍):

	/**
	 * 【流式执行】使用纯文本字符串调用Agent,返回节点输出流
	 * <p>返回完整的节点执行信息,包含图执行的每个步骤详情</p>
	 *
	 * @param message 用户输入的文本消息
	 * @return 节点输出流,每个NodeOutput包含节点执行结果
	 * @throws GraphRunnerException 图执行过程中抛出的异常
	 */
	public Flux<NodeOutput> stream(String message) throws GraphRunnerException {
		Map<String, Object> inputs = buildMessageInput(message);
		return doStream(inputs, buildStreamConfig(null));
	}

3.1.2 演示案例

案例1 ,基础流式执行(实时获取 AI 回复+工具调用中间消息)

// 1. 构建输入参数(查询天气,会触发工具调用)
Map<String, Object> inputs = Map.of("input", "查询广州今天的天气");

// 2. 调用stream方法,订阅流式结果
agent.stream(inputs, null)
        .subscribe(
                // 正常接收消息(实时推送)
                message -> {
                    if (message instanceof AssistantMessage) {
                        // 接收LLM流式回复(逐字输出)
                        System.out.print("AI回复:" + ((AssistantMessage) message).getContent());
                    } else if (message instanceof ToolMessage) {
                        // 接收工具调用中间消息(如工具调用请求、返回结果)
                        ToolMessage toolMessage = (ToolMessage) message;
                        System.out.println("\n工具调用:" + toolMessage.getToolName() + ",结果:" + toolMessage.getContent());
                    }
                },
                // 异常处理
                error -> System.err.println("流式执行异常:" + error.getMessage()),
                // 执行完成回调
                () -> System.out.println("\n流式执行完成")
        );

3.2 streamMessages()

3.2.1 核心源码解析

streamMessages 流式执行方法返回消息流Flux<Message>),支持多种不同类型的输入(后续会全面介绍):

/**
 * 【流式执行】使用纯文本字符串调用Agent,返回消息流(便捷API)
 * <p>基于 stream() 方法构建的便捷层,自动过滤掉中间节点输出,仅返回用户关心的消息</p>
 * <p>适用于实时对话、前端流式展示等不需要关注图执行细节的场景</p>
 *
 * @param message 用户输入的文本消息
 * @return 消息流,按生成顺序逐条发出
 * @throws GraphRunnerException 图执行过程中抛出的异常
 */
public Flux<Message> streamMessages(String message) throws GraphRunnerException {
	return stream(message)
			.transform(this::extractMessages);
}

3.2.2 演示案例

案例1 , 简化流式执行(仅获取 AI 回复,忽略中间过程):

// 1. 构建多轮对话消息列表
List<Message> messages = new ArrayList<>();
messages.add(new UserMessage("什么是ReAct Agent?"));
messages.add(new AssistantMessage("ReAct Agent是一种基于ReAct循环的智能体,核心是推理-行动-观察。"));
messages.add(new UserMessage("详细说下ReAct循环的步骤"));

// 2. 调用streamMessages方法,仅订阅AssistantMessage
agent.streamMessages(messages, null)
        .subscribe(
                message -> {
                    // 仅接收AI流式回复,逐字打印
                    System.out.print(((AssistantMessage) message).getContent());
                },
                error -> System.err.println("流式执行异常:" + error.getMessage()),
                () -> System.out.println("\n回复完成")
        );
Logo

为武汉地区的开发者提供学习、交流和合作的平台。社区聚集了众多技术爱好者和专业人士,涵盖了多个领域,包括人工智能、大数据、云计算、区块链等。社区定期举办技术分享、培训和活动,为开发者提供更多的学习和交流机会。

更多推荐