上个月我的 AI Agent 凌晨 3 点崩了。

不是代码 bug,不是 API 挂了,是 Gemini 突然开始返回空响应——HTTP 200,body 里啥也没有。Agent 傻傻重试了 5 次,每次都以为"网络波动",最后把一整天 1500 次 API 额度全烧光了。

第二天早上我看到账单的时候,血压直接拉满。

这让我意识到一个问题:我们这代人写 AI Agent,99% 的精力花在"让它能干活",几乎没人想"它干砸了怎么办"。 demo 跑通就觉得自己赢了,但你让它跑一个月试试?各种诡异的失败模式会教你做人。

摘要:本文从三个真实生产事故出发,系统拆解 AI Agent 在 7×24 运行中遇到的 6 类典型故障模式,给出重试策略、降级链、熔断器和状态持久化的完整实现方案。读完你会知道怎么让 Agent 在没人看着的时候不把自己玩死。

1. 背景:Demo 和生产之间的鸿沟

先讲三个让我记忆深刻的事故。

事故一:空响应烧配额。 Gemini 2.5 Flash 在凌晨时段偶发返回 {"candidates": []},HTTP 状态码 200,没有任何错误信息。Agent 的重试逻辑只检查 HTTP 状态码,于是它默默重试了 1500 次。

事故二:429 雪崩。 某国产模型在月初配额重置后突然严格限流,Agent 的固定间隔重试全部命中限流窗口,5 个 cron 任务互相踩踏,一小时内触发了 2000+ 次 429。

事故三:Context 膨胀。 一个长任务跑了 6 小时后,对话历史膨胀到 80K tokens,模型开始"忘记"最开始的任务目标,在中间某一步无限循环。

这三个事故指向同一个根因:Agent 框架假设了"理想世界"——API 稳定、模型听话、错误可预测。 真实世界不是这样的。

graph TD
    A[Agent 发起调用] --> B{API 响应}
| B -->|200 + 正常数据| C[继续执行] |
| B -->|200 + 空响应| D[ 传统重试:当网络波动处理] |
| B -->|429 限流| E[ 固定间隔重试:雪崩] |
| B -->|500 服务端错误| F[ 指数退避重试] |
| B -->|连接超时| G[ 指数退避重试] |
    D --> H[烧配额/死循环]
    E --> I[多任务互相踩踏]
    F --> J[可能恢复]
    G --> J

说白了就是:重试策略单薄 = 定时炸弹。

2. 六类故障模式分类

跑了三个月 7×24 cron,我总结出 AI Agent 会遇到的故障一共六类:

故障类型 典型表现 发生频率 危害等级 传统重试有效?
网络层 连接超时、DNS 失败、TLS 握手失败 高(日均 3-8 次) ✅ 指数退避有效
HTTP 层 429、502、503、504 中(日均 1-3 次) 中-高 ⚠️ 需区分状态码
应用层-空响应 200 但 body 为空或 results=[] 低(周均 1-2 次) ❌ 传统策略完全无效
应用层-内容异常 模型输出格式错误、幻觉、截断 ❌ 需内容校验
配额/计费 429(超配额)、402(欠费) 极高 ❌ 重试会烧钱
Context 层 Token 超限、对话历史膨胀、目标漂移 长任务高频 ❌ 需主动管理

这里面最阴险的是「应用层-空响应」——HTTP 库告诉你一切正常,但 payload 是空的。大部分 Agent 框架的异常处理根本覆盖不到这个 case。

3. 可靠性工具箱

3.1 智能重试:不只是指数退避

普通的指数退避长这样:

import time
import random

def naive_retry(func, max_retries=3, base_delay=1):
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            time.sleep(delay)

这个实现有三个致命问题:
1. 不区分异常类型——429 和 500 用同一个策略
2. 没有 max_delay 上限——第 10 次重试会等 1024 秒
3. 不检查响应内容——200 + 空 body 直接放过

生产级版本:

import time
import random
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable, Any

class RetryDecision(Enum):
    RETRY = "retry"
    FAIL_FAST = "fail_fast"      # 立即失败,不重试
    SWITCH_MODEL = "switch_model" # 降级到备用模型

@dataclass
class RetryConfig:
    max_retries: int = 4
    base_delay: float = 1.0
    max_delay: float = 60.0
    jitter: bool = True

def classify_error(response: Any, exception: Optional[Exception]) -> RetryDecision:
    """核心:根据错误类型决定重试策略"""
    if exception is not None:
        err_str = str(exception).lower()
        # 连接层错误 → 可以重试
        if any(kw in err_str for kw in ("timeout", "connection", "reset")):
            return RetryDecision.RETRY
        # DNS/TLS 错误 → 快速失败(重试没用)
        if any(kw in err_str for kw in ("name resolution", "ssl", "tls")):
            return RetryDecision.FAIL_FAST

    # HTTP 状态码判断
    status = getattr(response, 'status_code', None)
    if status == 429:
        return RetryDecision.SWITCH_MODEL  # 限流 → 换模型
    if status in (402, 403):
        return RetryDecision.FAIL_FAST     # 配额/鉴权 → 立即停
    if status and status >= 500:
        return RetryDecision.RETRY         # 服务端错误 → 重试

    # 最难搞的:200 但空响应
    if status == 200:
        body = getattr(response, 'json', lambda: {})()
        if not body or not body.get("choices"):
            return RetryDecision.SWITCH_MODEL  # 空响应 → 换模型

    return RetryDecision.RETRY

def smart_retry(func: Callable, config: RetryConfig = RetryConfig()):
    last_exception = None
    for attempt in range(config.max_retries + 1):
        try:
            response = func()
            decision = classify_error(response, None)
            if decision == RetryDecision.RETRY and attempt < config.max_retries:
                delay = min(config.base_delay * (2 ** attempt), config.max_delay)
                if config.jitter:
                    delay += random.uniform(0, delay * 0.3)
                time.sleep(delay)
                continue
            return response, decision
        except Exception as e:
            last_exception = e
            decision = classify_error(None, e)
            if decision == RetryDecision.FAIL_FAST:
                raise
            if decision == RetryDecision.SWITCH_MODEL:
                raise  # 外层捕获后切换模型
            if attempt < config.max_retries:
                delay = min(config.base_delay * (2 ** attempt), config.max_delay)
                time.sleep(delay)
    raise last_exception

三个关键改进:
- classify_error 把错误分到三个桶:重试 / 立即失败 / 换模型
- max_delay 封顶 防止退避时间爆炸
- jitter 用 30% 随机抖动避免惊群效应

3.2 降级链:不要在一棵树上吊死

这是我用了两个月后沉淀下来的模型降级链:

主模型 (Claude Sonnet 4)
  ↓ 429/空响应
备用模型一 (Gemini 2.5 Pro)
  ↓ 也挂了
备用模型二 (DeepSeek V3)
  ↓ 全挂了
本地兜底 (Ollama + Qwen 7B)
  ↓ 本地都挂了
降级为预设回复 / 标记为待人工处理

实现:

from typing import List, Dict

class ModelFallbackChain:
    def __init__(self, models: List[Dict]):
        """
        models: [
            {"name": "claude-sonnet-4", "provider": "anthropic", "cost_per_1k": 0.003},
            {"name": "gemini-2.5-pro", "provider": "google", "cost_per_1k": 0.00125},
            ...
        ]
        """
        self.models = models
        self.current_index = 0
        self.failure_counts: Dict[str, int] = {}

    def get_model(self) -> Dict:
        if self.current_index >= len(self.models):
            raise RuntimeError("所有模型已耗尽,任务需要人工介入")
        return self.models[self.current_index]

    def fallback(self, reason: str) -> Dict:
        name = self.models[self.current_index]["name"]
        self.failure_counts[name] = self.failure_counts.get(name, 0) + 1
        self.current_index += 1

        # 熔断:同一个模型失败超过阈值,跳过它 5 分钟
        if self.failure_counts[name] >= 3:
            print(f"⚠️ {name} 已触发熔断,5分钟内不再使用")

        return self.get_model()

    def reset(self):
        self.current_index = 0

实际效果:空响应问题从"烧光配额"变成"多等 2 秒自动切模型",用户感知不到。

3.3 熔断器:防止连锁故障

五个 cron 任务共享同一个 API key 的时候,一个任务触发 429 会导致其他四个也全部炸。这就是没有熔断的后果。

import threading
from datetime import datetime, timedelta

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=300):
        self.threshold = failure_threshold
        self.timeout = recovery_timeout  # 秒
        self.failures = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED / OPEN / HALF_OPEN
        self._lock = threading.Lock()

    def call(self, func, *args, **kwargs):
        with self._lock:
            if self.state == "OPEN":
                if self._should_attempt_reset():
                    self.state = "HALF_OPEN"
                    print("🔶 熔断器进入半开状态,尝试恢复…")
                else:
                    raise CircuitBreakerOpenError(
                        f"熔断器打开中,{self.timeout}秒后自动恢复"
                    )

        try:
            result = func(*args, **kwargs)
            if self.state == "HALF_OPEN":
                self._reset()
            return result
        except Exception as e:
            self._record_failure()
            raise

    def _record_failure(self):
        self.failures += 1
        self.last_failure_time = datetime.now()
        if self.failures >= self.threshold:
            self.state = "OPEN"
            print(f"🔴 熔断器打开!连续 {self.failures} 次失败,暂停 {self.timeout}秒")

    def _should_attempt_reset(self):
        if self.last_failure_time is None:
            return True
        return datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout)

    def _reset(self):
        self.failures = 0
        self.state = "CLOSED"
        print("✅ 熔断器恢复,状态:CLOSED")

class CircuitBreakerOpenError(Exception):
    pass

用法很简单——把 API 调用包在熔断器里:

breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=300)

def call_llm(prompt):
    return breaker.call(openai_client.chat.completions.create,
                        model="gpt-4", messages=[{"role": "user", "content": prompt}])

一个任务触发熔断后,其他任务看到 OPEN 状态直接快速失败,不会再去撞墙。

3.4 状态持久化:长任务不能断点续传?

Agent 跑 6 小时的任务,中间崩了怎么办?从头再来 = 浪费 token = 烧钱。

我现在的方案是 checkpoint + 增量恢复

import json
import os
from datetime import datetime

class AgentCheckpoint:
    def __init__(self, task_id: str, checkpoint_dir: str = "/tmp/agent_checkpoints"):
        self.task_id = task_id
        self.dir = checkpoint_dir
        os.makedirs(self.dir, exist_ok=True)
        self.path = os.path.join(self.dir, f"{task_id}.json")

    def save(self, step: int, state: dict, context_summary: str):
        """保存当前步骤的状态快照"""
        checkpoint = {
            "task_id": self.task_id,
            "step": step,
            "state": state,
            "context_summary": context_summary,  # 压缩后的上下文摘要
            "timestamp": datetime.now().isoformat(),
            "total_tokens_used": state.get("tokens", 0),
        }
        with open(self.path, "w") as f:
            json.dump(checkpoint, f, indent=2, ensure_ascii=False)

    def load(self) -> dict | None:
        if not os.path.exists(self.path):
            return None
        with open(self.path) as f:
            return json.load(f)

    def can_resume(self) -> bool:
        cp = self.load()
        if cp is None:
            return False
        # 超过 24 小时的 checkpoint 视为过期
        elapsed = datetime.now() - datetime.fromisoformat(cp["timestamp"])
        return elapsed.total_seconds() < 86400

配合 Context 压缩——每 5 步生成一个摘要,重建对话时用摘要 + 最近 3 步的完整对话,token 从 80K 压到 8K。

环境要求:以上代码使用 Python 3.10+(dict | None 联合类型语法),熔断器依赖 threading(标准库,无需额外安装)。

4. 组装起来:一个生产级 Agent 循环

把重试、降级、熔断、checkpoint 串起来:

graph TD
    START[任务开始] --> CK{检查 Checkpoint}
| CK -->|有且有效| RESUME[从断点恢复] |
| CK -->|无或过期| INIT[初始化新任务] |

    RESUME --> EXEC
    INIT --> EXEC

    EXEC[执行当前步骤] --> CB{熔断器检查}
| CB -->|OPEN| WAIT[等待恢复时间] |
    WAIT --> CB
| CB -->|CLOSED/HALF| CALL[调用 LLM] |

    CALL --> RES{响应分类}
| RES -->|成功| SAVE[保存 Checkpoint] |
| RES -->|可重试错误| RETRY[指数退避重试] |
| RES -->|需降级| FALLBACK[切换模型] |

    RETRY --> CB
    FALLBACK --> CB

    SAVE --> CHECK{任务完成?}
| CHECK -->|| EXEC |
| CHECK -->|| DONE[ 任务完成] |

| FALLBACK -->|所有模型耗尽| ALERT[🔔 告警 + 人工介入] |

完整的 Agent runner:

class ReliableAgent:
    def __init__(self, task_id: str, model_chain: ModelFallbackChain):
        self.task_id = task_id
        self.checkpoint = AgentCheckpoint(task_id)
        self.breaker = CircuitBreaker(failure_threshold=5)
        self.model_chain = model_chain

    def run(self, steps: list) -> dict:
        cp = self.checkpoint.load()
        start_idx = cp["step"] + 1 if cp and self.checkpoint.can_resume() else 0

        for i in range(start_idx, len(steps)):
            step = steps[i]
            model = self.model_chain.get_model()

            try:
                result = self._execute_step(step, model)
                self.checkpoint.save(i, {"result": result, "tokens": result.get("usage", {}).get("total_tokens", 0)},
                                     context_summary=self._summarize_context())
            except CircuitBreakerOpenError:
                time.sleep(self.breaker.timeout)
                continue
            except Exception as e:
                # 降级链
                try:
                    model = self.model_chain.fallback(str(e))
                    result = self._execute_step(step, model)
                    self.checkpoint.save(i, {"result": result}, self._summarize_context())
                except RuntimeError:
                    raise RuntimeError(f"步骤 {i} 失败,所有模型不可用")

        return {"status": "completed", "task_id": self.task_id}

    def _execute_step(self, step, model):
        return self.breaker.call(
            lambda: self._call_llm(step, model["name"])
        )

5. 效果对比

部署这套方案前后三个月的对比数据:

指标 优化前(简单重试) 优化后(智能重试+降级+熔断)
日均 API 调用失败次数 47 次 6 次
配额浪费(无效重试消耗) 约 1200 tokens/天 约 80 tokens/天
429 连锁故障次数 月均 12 次 月均 1 次
长任务(>2h)成功率 62% 94%
人工介入频率 每周 3-4 次 每周 0-1 次
模型成本(月) $247 $89

配额浪费从 1200 tokens/天降到 80 tokens/天——降了 93%。这个数字不是我算的,是 API dashboard 上直接拉出来的。

6. 还没解决的问题

说实话,这套方案不完美。三个痛点:

1. 降级链的语义漂移。 Gemini Flash 和 Claude Sonnet 对同一个 prompt 的理解偏差还挺大的。在代码生成场景影响不大,但在需要精确语义理解的任务里(比如财务分析),降级可能导致输出质量断崖。

2. Context 压缩丢信息。 摘要再怎么做,总会丢细节。有一次 Agent 在摘要里把"用户说不要用 pandas"压缩没了,恢复后直接用 pandas 写了一大段,用户脸都绿了。

3. 熔断粒度太粗。 目前是整个 provider 级别的熔断,但实际上同一个 provider 的不同 model 可能互不影响。比如 OpenAI 的 GPT-4 限流了,GPT-4o-mini 可能还正常。

7. 总结

跑 AI Agent 到生产环境,可靠性代码的量大概是业务逻辑的 2-3 倍。不过,想省 token 省钱的话另说。

核心就四件事:
- 智能重试:区分错误类型,不是所有错误都值得重试
- 降级链:至少准备 3 个模型,最后一环是本地模型或人工兜底
- 熔断器:防止多任务互相踩踏
- 状态持久化:长任务必须 checkpoint,每 5 步存一次

这套代码我跑了一个多月了,放在 GitHub 上。如果你也在搞 Agent 生产化,欢迎拿去用,踩了坑评论区说一声——我接着修。


你跑 Agent 遇到过什么离谱的故障?评论区聊聊,我收集起来放到下一篇文章里。

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐