实战 LangGraph 循环执行:构建带自动重试的并行任务流

在构建复杂 AI 工作流时,我们经常会遇到不稳定的外部服务调用、需要重试的异步任务场景 —— 比如并行获取天气、时间等第三方数据,部分接口可能随机失败,这时候就需要工作流具备自动循环重试能力,直到任务成功或达到最大重试次数。

LangGraph 作为强大的工作流编排框架,完美支持循环执行逻辑,无需复杂的嵌套代码,就能通过状态管理 + 条件决策,实现优雅的循环重试流程。今天就和大家分享,如何用 LangGraph 打造一个支持并行任务、自动循环重试的完整工作流,彻底解决不稳定任务的执行难题。

一、核心场景与需求拆解

我们的目标场景很明确:

  1. 并行执行两个独立任务(获取天气、获取时间),两个任务都存在随机失败概率;
  2. 只要任意一个任务失败,就自动触发循环重试,无需手动重启流程;
  3. 支持设置最大重试次数,避免无限循环,保障流程稳定性;
  4. 全程通过状态管理数据,重试时复用原始输入,自动更新执行结果。

这种模式广泛适用于:第三方 API 重试、异步数据拉取、批量任务容错处理、AI 工具调用容错等实际业务场景,通用性极强。

二、LangGraph 循环执行的核心设计思路

LangGraph 实现循环的核心逻辑,完全围绕 **「状态管理 + 条件决策」** 展开,没有复杂的语法,核心分为三大模块:

1. 状态定义:循环的「数据载体」

首先定义专属的工作流状态,把输入数据、任务结果、重试次数、最大重试数、执行状态全部封装起来。状态是 LangGraph 循环的核心 —— 所有节点的执行、重试的判断、结果的更新,都基于这个状态流转,每一次循环都会自动更新重试次数和任务结果,为决策提供依据。

2. 执行节点:循环的「任务核心」

这是循环的主体节点,负责并行执行目标任务。我们通过线程池实现多任务并行执行,提升效率;执行完成后,会自动判断两个任务是否全部成功,并把结果、重试次数更新到状态中,传递给下一个节点。这个节点是循环的「动作执行者」,每一次重试都会重新执行这个节点。

3. 决策节点:循环的「开关控制器」

这是实现循环的关键核心!决策节点会读取最新的状态,根据规则判断流程走向:

  • ✅ 任务全部成功 → 结束流程;
  • ❌ 任务失败 + 未达最大重试次数 → 回到执行节点,重新循环
  • ❌ 任务失败 + 达到最大重试次数 → 强制结束流程。

通过这个决策节点,LangGraph 轻松实现了闭环循环,这也是它比传统工作流框架更灵活的地方。

三、LangGraph 循环执行的核心优势

对比传统代码写循环 + 重试的方式,LangGraph 的循环执行有三大突出优势:

1. 逻辑解耦,可读性拉满

循环逻辑、任务执行逻辑、决策逻辑完全分离,每个节点只做一件事。后续修改重试次数、调整任务逻辑、新增失败处理,都不用改动核心循环代码,维护成本极低。

2. 状态自动流转,无需手动管理

不用自己定义变量记录重试次数、不用手动传递任务结果,LangGraph 会自动管理状态的更新与传递,彻底避免手动管理状态带来的 bug。

3. 容错性强,适配不稳定场景

针对第三方接口、网络请求等易失败场景,自动重试机制能大幅提升任务成功率;同时限制最大重试次数,兼顾效率与稳定性,生产环境直接可用。

4. 并行 + 循环完美结合

支持在循环节点内执行并行任务,既解决了任务容错问题,又保证了执行效率,复杂工作流也能轻松编排。

四、落地价值与适用场景

这套 LangGraph 循环重试模式,是生产级工作流的必备能力,适用场景非常广泛:

  • 第三方 API 调用容错(天气、地图、支付等接口);
  • AI Agent 工具调用自动重试(LLM 接口、插件调用);
  • 批量数据拉取 / 处理,保障任务完整性;
  • 异步任务执行,失败自动恢复。

它让工作流从「一次性执行」变成「智能容错执行」,大幅提升系统的稳定性和健壮性。

五、总结

LangGraph 的循环执行能力,通过状态驱动 + 条件决策的设计,让复杂的循环重试工作流变得极简、优雅。

我们不需要编写嵌套的循环代码,只需要:定义状态承载数据→编写执行节点处理任务→编写决策节点控制循环,就能快速构建出支持并行、自动重试、容错性强的工作流。

这也是 LangGraph 的核心魅力:用最简洁的编排方式,解决最复杂的工作流问题,无论是 AI 应用还是常规业务流程,都能轻松驾驭。


代码流程图:

图片

 实现代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

from langgraph.graph import StateGraph, END

from typing import TypedDict, Annotated

import operator

import time

import random

from concurrent.futures import ThreadPoolExecutor, as_completed

# 1. 定义带重试状态的 State

class LoopState(TypedDict):

    input_data: str                  # 输入数据

    weather_data: str None         # 天气数据

    time_data: str None            # 时间数据

    retry_count: int                 # 重试次数

    max_retries: int                 # 最大重试次数

    is_success: bool                 # 是否执行成功(用于判断循环退出)

# 2. 模拟不稳定的并行任务(可能失败,触发循环)

def fetch_weather(input_data: str-str:

    """模拟天气数据获取(30%概率失败)"""

    print(f"\n[天气任务] 处理: {input_data}")

    if random.random() < 0.3:  # 30%失败概率

        print("[天气任务] ❌ 数据获取失败")

        return ""

    time.sleep(1)  # 模拟耗时

    result = f"{input_data} 天气:25℃ 晴天"

    print(f"[天气任务] ✅ 成功: {result}")

    return result

def fetch_time(input_data: str-str:

    """模拟时间数据获取(20%概率失败)"""

    print(f"\n[时间任务] 处理: {input_data}")

    if random.random() < 0.2:  # 20%失败概率

        print("[时间任务] ❌ 数据获取失败")

        return ""

    time.sleep(1)  # 模拟耗时

    result = f"{input_data} 时间:15:30 下午"

    print(f"[时间任务] ✅ 成功: {result}")

    return result

# 3. 并行执行节点(循环的核心执行节点)

def parallel_execution(state: LoopState) -> LoopState:

    """并行执行天气/时间任务,返回更新后的状态"""

    input_data = state["input_data"]

    retry_count = state["retry_count"+ 1

    print(f"\n{'='*50}")

    print(f"[循环执行] 第 {retry_count} 次重试")

    print('='*50)

    # 线程池并行执行

    with ThreadPoolExecutor(max_workers=2) as executor:

        future_weather = executor.submit(fetch_weather, input_data)

        future_time = executor.submit(fetch_time, input_data)

        # 收集结果

        weather_data = ""

        time_data = ""

        for future in as_completed([future_weather, future_time]):

            if future == future_weather:

                weather_data = future.result()

            elif future == future_time:

                time_data = future.result()

    # 判断本次执行是否成功

    is_success = bool(weather_data and time_data)

    # 返回更新后的状态

    return {

        "input_data": input_data,

        "weather_data": weather_data,

        "time_data": time_data,

        "retry_count": retry_count,

        "max_retries": state["max_retries"],

        "is_success": is_success

    }

# 4. 循环决策节点(判断是否继续重试)

def loop_decision(state: LoopState) -str:

    """

    决策节点:返回下一个节点名称(循环/结束)

    - 成功 → END

    - 失败且未达最大重试 → parallel_execution(继续循环)

    - 失败且达最大重试 → END

    """

    print(f"\n[决策节点] 重试次数: {state['retry_count']}/{state['max_retries']} | 执行成功: {state['is_success']}")

    # 退出条件1:执行成功

    if state["is_success"]:

        print("[决策节点] ✅ 所有任务成功,结束循环")

        return END

    # 退出条件2:达到最大重试次数

    elif state["retry_count"] >= state["max_retries"]:

        print("[决策节点] ❌ 达到最大重试次数,强制结束")

        return END

    # 继续循环:返回并行执行节点名称

    else:

        print("[决策节点] 🔄 任务失败,继续重试")

        return "parallel_execution"

# 5. 初始化节点(设置初始状态)

def init_state(state: LoopState) -> LoopState:

    """初始化循环状态"""

    return {

        "input_data": state["input_data"],

        "weather_data"None,

        "time_data"None,

        "retry_count"0,          # 初始重试次数0

        "max_retries": state["max_retries"],  # 最大重试次数(外部传入)

        "is_success"False        # 初始未成功

    }

# 6. 构建带循环的图

def create_loop_graph():

    graph_builder = StateGraph(LoopState)

    # 添加节点

    graph_builder.add_node("init", init_state)

    graph_builder.add_node("parallel_execution", parallel_execution)

    # 设置入口点

    graph_builder.set_entry_point("init")

    # 添加边:初始化 → 并行执行

    graph_builder.add_edge("init""parallel_execution")

    # 核心:添加条件边(决策节点 → 循环/结束)

    graph_builder.add_conditional_edges(

        source="parallel_execution",       # 源节点:并行执行节点

        path=loop_decision,                # 决策函数(返回下一个节点名称)

        path_map={                         # 路径映射(决策结果 → 目标节点)

            "parallel_execution""parallel_execution",  # 继续循环

            END: END                                     # 结束

        }

    )

    return graph_builder.compile()

# 7. 执行测试

if __name__ == "__main__":

    # 编译图

    loop_graph = create_loop_graph()

    # ========== 修复点1:跳过易报错的 ASCII 绘图 ==========

    # 导出 Mermaid 语法(循环结构也能正常展示)

    # 初始输入

    initial_input = {

        "input_data""北京",

        "max_retries"3  # 最大重试3次

    }

    # 执行循环图

    print("\n" + "="*60)

    print("🚀 开始执行带循环的LangGraph")

    print("="*60)

    start_time = time.time()

    # 调用图

    final_result = loop_graph.invoke(initial_input)

    # 输出结果

    elapsed = time.time() - start_time

    print(f"\n" + "="*60)

    print(f"✅ 执行完成!总耗时: {elapsed:.2f} 秒")

    print(f"🔍 最终状态:")

    print(f"   - 重试次数: {final_result['retry_count']}")

    print(f"   - 执行成功: {final_result['is_success']}")

    print(f"   - 天气数据: {final_result['weather_data'] or '获取失败'}")

    print(f"   - 时间数据: {final_result['time_data'] or '获取失败'}")

    print("="*60)

 
结果输出:


============================================================
🚀 开始执行带循环的LangGraph
============================================================

==================================================
[循环执行] 第 1 次重试
==================================================

[天气任务] 处理: 北京
[天气任务] ❌ 数据获取失败

[时间任务] 处理: 北京
[时间任务] ✅ 成功: 北京 时间:15:30 下午

[决策节点] 重试次数: 1/3 | 执行成功: False
[决策节点] 🔄 任务失败,继续重试

==================================================
[循环执行] 第 2 次重试
==================================================

[天气任务] 处理: 北京

[时间任务] 处理: 北京
[天气任务] ✅ 成功: 北京 天气:25℃ 晴天
[时间任务] ✅ 成功: 北京 时间:15:30 下午

[决策节点] 重试次数: 2/3 | 执行成功: True
[决策节点] ✅ 所有任务成功,结束循环

更多推荐