src/agents 详细源码解读

OpenClaw AI 智能体核心模块深度解析


📋 目录

  1. 模块概览
  2. 目录结构
  3. 核心组件详解
  4. 关键流程分析
  5. 数据流与状态管理
  6. 工具系统
  7. 技能系统
  8. 记忆与上下文
  9. 错误处理与容错
  10. 性能优化

1. 模块概览

1.1 模块定位

src/agents/ 是 OpenClaw 的核心智能体模块,负责:

  • 🤖 AI 推理引擎:基于 PI Agent 引擎实现对话和任务执行
  • 🔧 工具管理:定义、注册和执行工具
  • 🧠 上下文管理:对话历史的压缩和优化
  • 💾 记忆集成:向量搜索和语义检索
  • 🔌 技能系统:可插拔的功能模块
  • 🛡️ 安全控制:权限验证和策略执行

1.2 核心依赖

// 核心依赖包
@mariozechner/pi-agent-core      // PI Agent 核心库
@mariozechner/pi-coding-agent    // PI 编码智能体
@mariozechner/pi-ai              // PI AI 基础库
@mariozechner/pi-tui             // PI 终端 UI

1.3 设计原则

  1. 分层架构:清晰的职责分离
  2. 插件化:工具和技能可扩展
  3. 流式处理:支持实时响应
  4. 容错设计:完善的错误处理和重试机制
  5. 性能优化:智能的上下文压缩和缓存

2. 目录结构

src/agents/
├── pi-embedded-runner/          # PI 嵌入式运行器(核心)
│   ├── run/                     # 运行逻辑
│   │   ├── attempt.ts           # 单次尝试执行
│   │   ├── params.ts            # 参数定义
│   │   └── payloads.ts          # 载荷构建
│   ├── compact.ts               # 上下文压缩
│   ├── model.ts                 # 模型解析
│   ├── system-prompt.ts         # 系统提示词
│   ├── tool-split.ts            # 工具拆分
│   ├── tool-result-truncation.ts # 工具结果截断
│   ├── types.ts                 # 类型定义
│   └── run.ts                   # 主运行入口
├── tools/                       # 工具系统(90+ 工具)
│   ├── browser-tool.ts          # 浏览器工具
│   ├── web-search.ts            # 网络搜索
│   ├── memory-tool.ts           # 记忆检索
│   ├── sessions-*               # 会话管理工具
│   ├── [platform]-actions.ts    # 平台操作工具
│   └── ...
├── skills/                      # 技能系统
│   ├── config.js                # 技能配置
│   ├── workspace.js             # 工作区技能
│   └── types.js                 # 类型定义
├── subagent-*                   # 子智能体管理
├── auth-profiles.ts             # 认证配置文件
├── compaction.ts                # 上下文压缩
├── context.ts                   # 上下文管理
├── context-window-guard.ts      # 上下文窗口保护
├── failover-error.ts            # 故障转移
├── memory-search.ts             # 记忆搜索
├── model-auth.ts                # 模型认证
├── model-selection.ts           # 模型选择
├── pi-tools.ts                  # PI 工具创建
├── pi-embedded-subscribe.ts     # PI 订阅处理
├── sandbox.ts                   # 沙箱环境
├── session-*                    # 会话管理
└── workspace-*                  # 工作区管理

3. 核心组件详解

3.1 PI 嵌入式运行器(pi-embedded-runner)

3.1.1 架构概述

PI 嵌入式运行器是智能体的核心执行引擎,负责:

用户请求
    ↓
[运行入口] runEmbeddedPiAgent()
    ↓
[参数解析] resolveModel(), resolveExtraParams()
    ↓
[钩子触发] before_agent_start, before_prompt_build
    ↓
[载荷构建] buildEmbeddedRunPayloads()
    ↓
[PI 执行] runEmbeddedAttempt()
    ↓
[流式订阅] subscribeEmbeddedPiSession()
    ↓
[工具执行] executeTools()
    ↓
[上下文压缩] compactEmbeddedPiSession()
    ↓
[结果返回] EmbeddedPiRunResult
3.1.2 主运行入口

文件src/agents/pi-embedded-runner/run.ts

核心函数

export async function runEmbeddedPiAgent(
  options: RunEmbeddedPiAgentParams
): Promise<EmbeddedPiRunResult> {
  // 1. 解析模型和认证
  const model = await resolveModel(options);
  const auth = await resolveModelAuth(model);

  // 2. 触发 before_agent_start 钩子
  const hookResult = await getGlobalHookRunner().emitHook(
    'agent:before_agent_start',
    { agentId: options.agentId, model, config: options.config }
  );

  // 3. 构建运行载荷
  const payloads = await buildEmbeddedRunPayloads({
    ...options,
    model,
    auth,
    hookResult,
  });

  // 4. 执行 PI Agent
  const result = await runEmbeddedAttempt({
    ...options,
    model,
    auth,
    payloads,
  });

  // 5. 处理工具结果截断
  if (sessionLikelyHasOversizedToolResults(result.session)) {
    await truncateOversizedToolResultsInSession(result.session);
  }

  // 6. 触发 after_agent_end 钩子
  await getGlobalHookRunner().emitHook(
    'agent:after_agent_end',
    { result, options }
  );

  return result;
}

关键特性

  1. 故障转移:支持多模型和认证配置文件切换
  2. 钩子集成:在关键生命周期点触发插件钩子
  3. 载荷优化:智能构建最小化的 API 载荷
  4. 结果处理:自动截断过大的工具结果
3.1.3 单次尝试执行

文件src/agents/pi-embedded-runner/run/attempt.ts

核心逻辑

export async function runEmbeddedAttempt(
  params: RunEmbeddedAttemptParams
): Promise<EmbeddedPiRunResult> {
  // 1. 创建会话管理器
  const sessionManager = await createAgentSession({
    apiKey: params.auth.apiKey,
    baseUrl: params.auth.baseUrl,
    model: params.model,
    sessionFile: params.sessionFile,
    resourceLoader: createResourceLoader(params),
  });

  // 2. 保护会话管理器(添加工具结果守卫)
  const guardedSessionManager = guardSessionManager(
    sessionManager,
    params.toolResultGuardConfig
  );

  // 3. 构建流式函数
  const streamFn = createStreamFn(params);

  // 4. 订阅 PI 会话
  const subscription = await subscribeEmbeddedPiSession({
    sessionManager: guardedSessionManager,
    streamFn,
    onBlockReply: params.onBlockReply,
    onToolCall: params.onToolCall,
    onMessage: params.onMessage,
  });

  // 5. 等待完成
  await subscription.waitForEnd();

  // 6. 返回结果
  return {
    payloads: subscription.payloads,
    meta: subscription.meta,
    didSendViaMessagingTool: subscription.didSendViaMessagingTool,
  };
}

关键实现细节

流式函数创建
function createStreamFn(params: RunEmbeddedAttemptParams): StreamFn {
  const provider = normalizeProviderId(params.model.provider);

  // OpenAI WebSocket 流式
  if (isOpenAIProvider(provider)) {
    return createOpenAIWebSocketStreamFn({
      apiKey: params.auth.apiKey,
      baseUrl: params.auth.baseUrl,
      model: params.model.id,
    });
  }

  // Ollama 本地流式
  if (provider === 'ollama') {
    return createConfiguredOllamaStreamFn({
      baseUrl: params.auth.baseUrl,
      model: params.model.id,
    });
  }

  // 默认 HTTP 流式
  return streamSimple({
    apiKey: params.auth.apiKey,
    baseUrl: params.auth.baseUrl,
    model: params.model.id,
  });
}
资源加载器
function createResourceLoader(
  params: RunEmbeddedAttemptParams
): DefaultResourceLoader {
  return new DefaultResourceLoader({
    // 加载 Bootstrap 文件
    loadBootstrap: async () => {
      return resolveBootstrapContextForRun(params);
    },

    // 加载技能
    loadSkills: async () => {
      return buildWorkspaceSkillsPrompt(params.workspaceDir);
    },

    // 加载工具定义
    loadTools: async () => {
      return toClientToolDefinitions(params.tools);
    },
  });
}
3.1.4 上下文压缩

文件src/agents/pi-embedded-runner/compact.ts

压缩策略

export async function compactEmbeddedPiSession(
  session: Session,
  options: CompactOptions
): Promise<EmbeddedPiCompactResult> {
  // 1. 估算当前 token 使用
  const tokensBefore = estimateMessagesTokens(session.history);

  // 2. 检查是否需要压缩
  const contextWindow = resolveContextWindowInfo(session.model);
  const shouldCompact = evaluateContextWindowGuard({
    tokens: tokensBefore,
    window: contextWindow.tokens,
    safetyMargin: options.safetyMargin || 1.2,
  });

  if (!shouldCompact) {
    return { ok: true, compacted: false };
  }

  // 3. 识别保留部分
  const keepSections = identifyKeepSections(session.history, {
    keepRecent: options.recentTurnsToKeep || 2,
    keepUserQuestions: true,
    keepToolCalls: true,
    keepSystemMessages: false,
  });

  // 4. 生成摘要
  const summary = await generateSummary({
    history: session.history,
    exclude: keepSections,
    instructions: buildCompactionSummarizationInstructions(
      options.customInstructions,
      options.identifierPolicy
    ),
  });

  // 5. 构建压缩后的历史
  const compactedHistory = [
    ...keepSections,
    {
      role: 'system',
      content: `以下是对话历史的摘要:\n${summary}`,
    },
  ];

  // 6. 更新会话
  session.history = compactedHistory;
  session.compactionCount = (session.compactionCount || 0) + 1;

  const tokensAfter = estimateMessagesTokens(compactedHistory);

  return {
    ok: true,
    compacted: true,
    result: {
      summary,
      firstKeptEntryId: keepSections[0]?.id,
      tokensBefore,
      tokensAfter,
    },
  };
}

压缩算法

  1. Token 估算:使用 @mariozechner/pi-coding-agentestimateTokens()
  2. 保留策略
    • 保留最近的 N 轮对话(默认 2 轮)
    • 保留用户问题
    • 保留工具调用
    • 移除系统消息
  3. 摘要生成
    • 使用 LLM 生成摘要
    • 保留关键信息:任务状态、决策、TODO、承诺
    • 支持自定义指令
  4. 标识符保护
    • 严格模式:保留所有 UUID、哈希、ID、令牌等
    • 自定义模式:使用用户提供的指令
    • 关闭模式:不保护标识符
3.1.5 系统提示词

文件src/agents/pi-embedded-runner/system-prompt.ts

提示词结构

export async function createSystemPromptOverride(
  params: SystemPromptParams
): Promise<string> {
  const sections: string[] = [];

  // 1. 基础系统提示词
  if (params.baseSystemPrompt) {
    sections.push(params.baseSystemPrompt);
  }

  // 2. 技能提示词
  if (params.skillsPrompt) {
    sections.push(params.skillsPrompt);
  }

  // 3. 工具使用指南
  if (params.tools.length > 0) {
    sections.push(buildToolsUsageGuide(params.tools));
  }

  // 4. 上下文窗口信息
  sections.push(buildContextWindowInfo(params.model));

  // 5. 沙箱环境信息
  if (params.sandbox.enabled) {
    sections.push(buildSandboxInfo(params.sandbox));
  }

  // 6. 平台特定提示词
  if (params.platformHints) {
    sections.push(params.platformHints);
  }

  // 7. 自定义指令
  if (params.customInstructions) {
    sections.push(params.customInstructions);
  }

  return sections.join('\n\n');
}

提示词优化

  1. 动态构建:根据运行时状态动态组装
  2. 分层结构:清晰的章节划分
  3. 上下文感知:包含环境、工具、约束等信息
  4. Token 优化:自动去除冗余内容
3.1.6 工具拆分

文件src/agents/pi-embedded-runner/tool-split.ts

拆分策略

export function splitSdkTools(
  tools: AnyAgentTool[],
  options: SplitOptions
): {
  sdkTools: AgentTool[];
  customTools: AgentTool[];
} {
  const sdkTools: AgentTool[] = [];
  const customTools: AgentTool[] = [];

  for (const tool of tools) {
    // SDK 内置工具(file_browser, bash, python_repl 等)
    if (isSdkTool(tool)) {
      sdkTools.push(toSdkTool(tool));
    }
    // OpenClaw 自定义工具(web_search, memory, browser 等)
    else {
      customTools.push(toCustomTool(tool));
    }
  }

  return { sdkTools, customTools };
}

function isSdkTool(tool: AnyAgentTool): boolean {
  const sdkToolNames = new Set([
    'file_browser',
    'bash',
    'python_repl',
    'text_editor',
  ]);
  return sdkToolNames.has(tool.name);
}

拆分原因

  1. 兼容性:不同 LLM 提供商对工具的支持不同
  2. 性能:减少不必要的工具传输
  3. 安全性:分离高风险工具
  4. 灵活性:便于工具管理
3.1.7 工具结果截断

文件src/agents/pi-embedded-runner/tool-result-truncation.ts

截断策略

export function truncateOversizedToolResultsInSession(
  session: Session,
  options: TruncationOptions
): void {
  const maxChars = options.maxChars || 100_000;

  for (const message of session.history) {
    if (message.role !== 'tool_result') continue;

    const result = message.content as ToolResult;
    if (!result.content) continue;

    // 估算字符数
    const estimatedChars = estimateToolResultChars(result);

    if (estimatedChars > maxChars) {
      // 头尾截断
      const headChars = Math.floor(maxChars / 2);
      const tailChars = maxChars - headChars;

      const originalContent = result.content;
      result.content = [
        originalContent.slice(0, headChars),
        `\n... [截断 ${estimatedChars - maxChars} 字符] ...\n`,
        originalContent.slice(-tailChars),
      ].join('');

      // 记录截断
      result.truncated = true;
      result.originalSize = estimatedChars;
    }
  }
}

function estimateToolResultChars(result: ToolResult): number {
  if (typeof result.content === 'string') {
    return result.content.length;
  }
  if (typeof result.content === 'object') {
    return JSON.stringify(result.content).length;
  }
  return 0;
}

截断策略

  1. 阈值检测:检测超过阈值的工具结果
  2. 头尾保留:保留结果的开头和结尾
  3. 截断标记:添加截断提示信息
  4. 元数据记录:记录原始大小和截断状态

3.2 上下文管理(context.ts)

3.2.1 上下文窗口管理

核心功能

export async function resolveContextWindowInfo(
  model: ModelReference
): Promise<ContextWindowInfo> {
  // 1. 查找模型上下文窗口
  const contextWindow = await findModelContextWindow(model.id);

  // 2. 应用配置覆盖
  const configuredWindow = getConfiguredContextWindow(model.id);
  const effectiveWindow = configuredWindow || contextWindow;

  // 3. 应用安全边际
  const usableWindow = Math.floor(
    effectiveWindow * SAFETY_MARGIN
  );

  return {
    tokens: effectiveWindow,
    usableTokens: usableWindow,
    minTokens: CONTEXT_WINDOW_HARD_MIN_TOKENS,
    warnThreshold: CONTEXT_WINDOW_WARN_BELOW_TOKENS,
  };
}

上下文窗口发现

async function findModelContextWindow(
  modelId: string
): Promise<number> {
  // 1. 检查缓存
  if (MODEL_CACHE.has(modelId)) {
    return MODEL_CACHE.get(modelId)!;
  }

  // 2. 从配置文件加载
  const modelsConfig = await loadModelsConfig();
  const configuredWindow = modelsConfig[modelId]?.contextWindow;

  // 3. 从提供商 API 发现
  if (!configuredWindow) {
    const discoveredWindow = await discoverFromProvider(modelId);
    if (discoveredWindow) {
      MODEL_CACHE.set(modelId, discoveredWindow);
      return discoveredWindow;
    }
  }

  // 4. 使用默认值
  return DEFAULT_CONTEXT_TOKENS;
}

支持的模型

提供商 模型 上下文窗口
Anthropic Claude 3 Opus 200K
Anthropic Claude 3.5 Sonnet 200K
OpenAI GPT-4 Turbo 128K
OpenAI GPT-4o 128K
Google Gemini 1.5 Pro 1M
Google Gemini 1.5 Flash 1M
3.2.2 上下文窗口保护

文件src/agents/context-window-guard.ts

保护机制

export function evaluateContextWindowGuard(
  params: ContextWindowGuardParams
): ContextWindowGuardResult {
  const { tokens, window, safetyMargin } = params;
  const usableWindow = Math.floor(window * safetyMargin);

  // 1. 硬性限制检查
  if (tokens > window) {
    return {
      status: 'overflow',
      action: 'must_compact',
      message: `Token count (${tokens}) exceeds context window (${window})`,
    };
  }

  // 2. 警告阈值检查
  if (tokens > usableWindow) {
    return {
      status: 'warning',
      action: 'should_compact',
      message: `Token count (${tokens}) approaches usable window (${usableWindow})`,
    };
  }

  // 3. 正常状态
  return {
    status: 'ok',
    action: 'none',
    message: 'Token count within safe limits',
  };
}

保护级别

  1. 硬性限制:超过上下文窗口,必须压缩
  2. 警告阈值:接近可用窗口,建议压缩
  3. 安全边际:留出 20% 缓冲空间

3.3 故障转移(failover-error.ts)

3.3.1 错误分类

错误类型

export type FailoverReason =
  | 'auth'                    // 认证失败
  | 'rate_limit'              // 速率限制
  | 'context_overflow'        // 上下文溢出
  | 'compaction_failure'      // 压缩失败
  | 'timeout'                 // 超时
  | 'server_error'            // 服务器错误
  | 'billing'                 // 计费问题
  | 'model_unavailable'       // 模型不可用
  | 'unknown';                // 未知错误

export function classifyFailoverReason(
  error: Error
): FailoverReason {
  // 1. 认证错误
  if (isAuthAssistantError(error)) {
    return 'auth';
  }

  // 2. 速率限制
  if (isRateLimitAssistantError(error)) {
    return 'rate_limit';
  }

  // 3. 上下文溢出
  if (isLikelyContextOverflowError(error)) {
    return 'context_overflow';
  }

  // 4. 压缩失败
  if (isCompactionFailureError(error)) {
    return 'compaction_failure';
  }

  // 5. 超时
  if (isTimeoutErrorMessage(error)) {
    return 'timeout';
  }

  // 6. 计费问题
  if (isBillingAssistantError(error)) {
    return 'billing';
  }

  // 7. 服务器错误
  if (isFailoverErrorMessage(error)) {
    return 'server_error';
  }

  // 8. 未知错误
  return 'unknown';
}
3.3.2 故障转移策略

配置文件认证

export async function resolveAuthProfileOrder(
  model: ModelReference,
  config: OpenClawConfig
): Promise<ResolvedAuthProfile[]> {
  const profiles = config.agents?.defaults?.authProfiles;

  // 1. 获取配置的认证配置文件
  const configuredProfiles = profiles
    ?.filter(p => p.model === model.id || p.model === '*')
    .sort((a, b) => (a.order || 0) - (b.order || 0));

  // 2. 应用冷却策略
  const availableProfiles = configuredProfiles.filter(
    p => !isProfileInCooldown(p.id)
  );

  // 3. 返回可用配置文件
  return availableProfiles.map(p => ({
    id: p.id,
    apiKey: p.apiKey,
    baseUrl: p.baseUrl,
    order: p.order || 0,
  }));
}

故障转移执行

export async function resolveFailoverStatus(
  error: Error,
  params: FailoverParams
): Promise<FailoverStatus> {
  const reason = classifyFailoverReason(error);

  // 1. 标记失败
  if (params.currentProfile) {
    await markAuthProfileFailure(params.currentProfile.id, {
      reason,
      timestamp: Date.now(),
    });
  }

  // 2. 查找下一个可用配置文件
  const nextProfile = await findNextAvailableProfile({
    model: params.model,
    excludeProfileId: params.currentProfile?.id,
  });

  if (nextProfile) {
    return {
      shouldRetry: true,
      nextProfile,
      reason,
      backoffMs: calculateBackoffMs(reason),
    };
  }

  // 3. 没有可用的配置文件
  return {
    shouldRetry: false,
    reason,
    message: 'No available auth profiles',
  };
}

function calculateBackoffMs(reason: FailoverReason): number {
  const backoffPolicies: Record<FailoverReason, BackoffPolicy> = {
    auth: { initialMs: 1000, maxMs: 30000, factor: 2 },
    rate_limit: { initialMs: 5000, maxMs: 60000, factor: 2 },
    context_overflow: { initialMs: 100, maxMs: 1000, factor: 1.5 },
    timeout: { initialMs: 2000, maxMs: 30000, factor: 2 },
    server_error: { initialMs: 1000, maxMs: 10000, factor: 1.5 },
    billing: { initialMs: 0, maxMs: 0, factor: 1 },
    model_unavailable: { initialMs: 0, maxMs: 0, factor: 1 },
    unknown: { initialMs: 2000, maxMs: 30000, factor: 2 },
  };

  const policy = backoffPolicies[reason] || backoffPolicies.unknown;
  return computeBackoff(policy);
}

故障转移特性

  1. 智能分类:准确识别错误类型
  2. 冷却策略:失败的配置文件暂时禁用
  3. 回退算法:指数退避 + 随机抖动
  4. 自动恢复:冷却期后自动恢复配置文件

3.4 模型认证(model-auth.ts)

3.4.1 认证模式

认证类型

export type ModelAuthMode =
  | 'api_key'                 // API 密钥
  | 'oauth'                   // OAuth
  | 'profile'                 // 认证配置文件
  | 'env'                     // 环境变量
  | 'none';                   // 无认证

export async function resolveModelAuthMode(
  model: ModelReference
): Promise<ModelAuthMode> {
  // 1. 检查是否配置了认证配置文件
  if (hasAuthProfiles(model.id)) {
    return 'profile';
  }

  // 2. 检查是否配置了 OAuth
  if (hasOAuthConfig(model.id)) {
    return 'oauth';
  }

  // 3. 检查环境变量
  if (hasEnvApiKey(model.id)) {
    return 'env';
  }

  // 4. 检查 API 密钥
  if (hasApiKey(model.id)) {
    return 'api_key';
  }

  // 5. 无需认证
  return 'none';
}
3.4.2 API 密钥解析

密钥来源

export async function getApiKeyForModel(
  model: ModelReference
): Promise<string | undefined> {
  const provider = normalizeProviderId(model.provider);

  // 1. 从配置文件读取
  const configKey = `agents.defaults.providers.${provider}.apiKey`;
  const configValue = config.get(configKey);
  if (configValue) {
    return resolveSecretInput(configValue);
  }

  // 2. 从环境变量读取
  const envVar = getEnvVarForModel(model);
  if (envVar) {
    return process.env[envVar];
  }

  // 3. 从 SecretRef 解析
  const secretRef = getSecretRefForModel(model);
  if (secretRef) {
    return await resolveSecretInput(secretRef);
  }

  return undefined;
}

function getEnvVarForModel(model: ModelReference): string | undefined {
  const provider = normalizeProviderId(model.provider);
  const envVars: Record<string, string> = {
    openai: 'OPENAI_API_KEY',
    anthropic: 'ANTHROPIC_API_KEY',
    google: 'GOOGLE_API_KEY',
    gemini: 'GEMINI_API_KEY',
    mistral: 'MISTRAL_API_KEY',
    // ...
  };
  return envVars[provider];
}
3.4.3 GitHub Copilot 认证

特殊处理

export async function getGitHubCopilotToken(
  githubToken: string
): Promise<string> {
  // 1. 获取 Copilot 认证
  const auth = await fetch('https://api.github.com/copilot_internal/v2/token/get', {
    headers: {
      'Authorization': `token ${githubToken}`,
      'Accept': 'application/json',
    },
  });

  const data = await auth.json();

  // 2. 提取访问令牌
  const accessToken = data.token;

  // 3. 设置自动刷新
  if (data.expires_at) {
    scheduleTokenRefresh(accessToken, data.expires_at);
  }

  return accessToken;
}

function scheduleTokenRefresh(
  token: string,
  expiresAt: number
): void {
  const refreshMargin = 5 * 60 * 1000; // 5 分钟前刷新
  const refreshTime = expiresAt - refreshMargin;
  const delay = Math.max(refreshTime - Date.now(), 0);

  setTimeout(async () => {
    try {
      const newToken = await getGitHubCopilotToken(githubToken);
      // 更新缓存的令牌
      COPILOT_TOKEN_CACHE.set(token, newToken);
    } catch (error) {
      log.error('Failed to refresh Copilot token', error);
      // 重试
      setTimeout(() => scheduleTokenRefresh(token, expiresAt), 60000);
    }
  }, delay);
}

3.5 模型选择(model-selection.ts)

3.5.1 模型解析

模型标识符

export function normalizeProviderId(
  provider: string | undefined
): string {
  if (!provider) return 'openai';

  const normalized = provider.toLowerCase().trim();

  // 标准化提供商名称
  const aliases: Record<string, string> = {
    'openai-codex': 'openai',
    'chatgpt': 'openai',
    'claude': 'anthropic',
    'gemini': 'google',
    'mistral-ai': 'mistral',
    'qwen': 'alibaba',
    // ...
  };

  return aliases[normalized] || normalized;
}

export function parseModelReference(
  modelId: string
): ModelReference {
  // 格式:provider/model 或仅 model
  const parts = modelId.split('/');

  if (parts.length === 2) {
    return {
      provider: normalizeProviderId(parts[0]),
      id: parts[1],
      fullName: modelId,
    };
  }

  // 默认提供商
  return {
    provider: 'openai',
    id: modelId,
    fullName: `openai/${modelId}`,
  };
}
3.5.2 默认模型选择

选择策略

export async function resolveDefaultModelForAgent(
  agentId: string,
  config: OpenClawConfig
): Promise<ModelReference> {
  // 1. 检查 Agent 特定配置
  const agentConfig = resolveAgentConfig(agentId, config);
  if (agentConfig.model) {
    return parseModelReference(agentConfig.model);
  }

  // 2. 检查全局默认配置
  const globalDefault = config.agents?.defaults?.model;
  if (globalDefault) {
    return parseModelReference(globalDefault);
  }

  // 3. 使用内置默认值
  return {
    provider: DEFAULT_PROVIDER,
    id: DEFAULT_MODEL,
    fullName: `${DEFAULT_PROVIDER}/${DEFAULT_MODEL}`,
  };
}
3.5.3 模型能力检测

功能支持

export async function supportsModelTools(
  model: ModelReference
): Promise<boolean> {
  // 1. 检查提供商
  const provider = normalizeProviderId(model.provider);

  // 2. 已知支持工具的提供商
  const toolSupportProviders = new Set([
    'openai',
    'anthropic',
    'google',
    'mistral',
    'ollama',
  ]);

  if (!toolSupportProviders.has(provider)) {
    return false;
  }

  // 3. 检查特定模型
  const unsupportedModels = new Set([
    'gpt-3.5-turbo-instruct',
    'text-davinci-003',
    // ...
  ]);

  if (unsupportedModels.has(model.id)) {
    return false;
  }

  return true;
}

export async function supportsModelImages(
  model: ModelReference
): Promise<boolean> {
  // 1. 检查提供商
  const provider = normalizeProviderId(model.provider);

  // 2. 已知支持图像的提供商
  const imageSupportProviders = new Set([
    'openai',
    'anthropic',
    'google',
  ]);

  if (!imageSupportProviders.has(provider)) {
    return false;
  }

  // 3. 检查特定模型
  const imageCapableModels = new Set([
    'gpt-4-vision-preview',
    'gpt-4o',
    'claude-3-opus',
    'claude-3.5-sonnet',
    'gemini-1.5-pro',
    'gemini-1.5-flash',
  ]);

  return imageCapableModels.has(model.id);
}

4. 关键流程分析

4.1 完整的智能体执行流程

用户请求

runEmbeddedPiAgent

解析模型

解析认证

触发 before_agent_start 钩子

构建运行载荷

runEmbeddedAttempt

创建会话管理器

保护会话管理器

创建流式函数

subscribeEmbeddedPiSession

流式响应

有工具调用?

执行工具

继续响应

触发 before_tool_call 钩子

执行工具逻辑

触发 after_tool_call 钩子

返回工具结果

需要压缩?

compactEmbeddedPiSession

继续对话

完成?

返回结果

触发 after_agent_end 钩子

返回 EmbeddedPiRunResult

4.2 工具执行流程

LLM 返回工具调用

解析工具调用

查找工具定义

验证参数

触发 before_tool_call 钩子

检查权限

权限允许?

返回权限错误

执行工具

捕获结果

触发 after_tool_call 钩子

格式化结果

返回给 LLM

LLM 继续对话

4.3 上下文压缩流程

检测 token 使用

超过阈值?

无需压缩

识别保留部分

生成摘要

构建压缩历史

更新会话

增加压缩计数

返回压缩结果


5. 数据流与状态管理

5.1 会话状态

会话数据结构

interface Session {
  id: string;
  history: AgentMessage[];
  compactionCount?: number;
  createdAt: number;
  updatedAt: number;

  // 上下文信息
  contextWindow: ContextWindowInfo;
  model: ModelReference;

  // 运行时状态
  status: 'idle' | 'running' | 'aborted';
  currentRun?: EmbeddedPiRunMeta;

  // 统计信息
  stats: {
    totalTurns: number;
    totalTokens: number;
    totalCost: number;
  };
}

5.2 消息流

消息类型

type AgentMessage =
  | UserMessage
  | AssistantMessage
  | SystemMessage
  | ToolResultMessage;

interface UserMessage {
  role: 'user';
  content: string | ContentBlock[];
  timestamp: number;
  metadata?: UserMessageMetadata;
}

interface AssistantMessage {
  role: 'assistant';
  content: string | ContentBlock[];
  timestamp: number;
  toolCalls?: ToolCall[];
  metadata?: AssistantMessageMetadata;
}

interface ToolResultMessage {
  role: 'tool_result';
  toolCallId: string;
  content: string | ToolResult;
  isError?: boolean;
  truncated?: boolean;
}

5.3 状态持久化

会话文件格式

{
  "id": "session-uuid",
  "agentId": "main",
  "history": [
    {
      "role": "user",
      "content": "用户消息",
      "timestamp": 1234567890
    },
    {
      "role": "assistant",
      "content": "助手回复",
      "timestamp": 1234567891
    }
  ],
  "compactionCount": 3,
  "contextWindow": {
    "tokens": 200000,
    "usableTokens": 160000
  },
  "model": {
    "provider": "anthropic",
    "id": "claude-3.5-sonnet"
  },
  "stats": {
    "totalTurns": 10,
    "totalTokens": 15000
  }
}

6. 工具系统

6.1 工具架构

工具定义

interface AnyAgentTool {
  name: string;
  description: string;
  parameters: ToolParameters;
  handler?: ToolHandler;
  permissions?: ToolPermissions;
  category?: ToolCategory;
}

interface ToolParameters {
  type: 'object';
  properties: Record<string, ParameterDefinition>;
  required: string[];
}

interface ParameterDefinition {
  type: string;
  description: string;
  enum?: string[];
  default?: unknown;
}

6.2 工具分类

90+ 工具分类

类别 工具 数量
文件操作 file_browser, text_editor, read, write 5
执行环境 bash, python_repl, node_exec 3
网络工具 web_search, web_fetch, browser 3
会话管理 sessions_list, sessions_history, sessions_send 5
记忆系统 memory_search, memory_add 2
平台操作 telegram_actions, slack_actions, discord_actions 15+
系统工具 cron_add, gateway_status, agents_list 8
媒体处理 image_tool, pdf_tool, tts_tool 3
其他 canvas, nodes, subagents 5

6.3 工具创建

工具创建流程

export function createOpenClawCodingTools(
  params: CreateToolsParams
): AnyAgentTool[] {
  const tools: AnyAgentTool[] = [];

  // 1. 添加 SDK 内置工具
  tools.push(...codingTools);

  // 2. 添加文件操作工具
  tools.push(
    createHostWorkspaceReadTool(params),
    createHostWorkspaceWriteTool(params),
    createHostWorkspaceEditTool(params)
  );

  // 3. 添加执行工具
  tools.push(
    createExecTool(params.execDefaults),
    createProcessTool(params.processDefaults)
  );

  // 4. 添加网络工具
  tools.push(
    createWebSearchTool(params),
    createWebFetchTool(params)
  );

  // 5. 添加记忆工具
  tools.push(
    createMemorySearchTool(params.memoryConfig)
  );

  // 6. 添加平台工具
  tools.push(...listChannelAgentTools(params));

  // 7. 应用权限策略
  return applyToolPolicyPipeline(tools, params);
}

6.4 工具权限

权限系统

interface ToolPermissions {
  allow?: string[];
  deny?: string[];
  ownerOnly?: boolean;
  requireAuth?: boolean;
}

function applyToolPolicyPipeline(
  tools: AnyAgentTool[],
  params: ToolPolicyParams
): AnyAgentTool[] {
  let filtered = tools;

  // 1. 应用配置文件策略
  filtered = resolveToolProfilePolicy(filtered, params.profile);

  // 2. 应用所有者策略
  filtered = applyOwnerOnlyToolPolicy(filtered, params.owner);

  // 3. 应用群组策略
  filtered = resolveGroupToolPolicy(filtered, params.groupPolicy);

  // 4. 应用子智能体策略
  filtered = resolveSubagentToolPolicy(filtered, params.subagentDepth);

  // 5. 应用消息提供者策略
  filtered = applyMessageProviderToolPolicy(filtered, params.messageProvider);

  return filtered;
}

6.5 工具执行

执行流程

async function executeTool(
  toolCall: ToolCall,
  context: ExecutionContext
): Promise<ToolResult> {
  // 1. 查找工具
  const tool = findTool(toolCall.name);
  if (!tool) {
    throw new Error(`Tool not found: ${toolCall.name}`);
  }

  // 2. 验证参数
  const params = validateToolParameters(tool, toolCall.arguments);

  // 3. 检查权限
  const permissions = checkToolPermissions(tool, context);
  if (!permissions.allowed) {
    return {
      isError: true,
      content: `Permission denied: ${permissions.reason}`,
    };
  }

  // 4. 触发 before_tool_call 钩子
  await getGlobalHookRunner().emitHook('tool:before_tool_call', {
    tool: tool.name,
    params,
    context,
  });

  // 5. 执行工具
  let result: unknown;
  try {
    result = await tool.handler(params, context);
  } catch (error) {
    result = {
      isError: true,
      content: error.message,
    };
  }

  // 6. 触发 after_tool_call 钩子
  await getGlobalHookRunner().emitHook('tool:after_tool_call', {
    tool: tool.name,
    params,
    result,
    context,
  });

  // 7. 格式化结果
  return formatToolResult(result);
}

7. 技能系统

7.1 技能架构

技能定义

interface OpenClawSkillMetadata {
  id: string;
  name: string;
  description: string;
  version: string;
  permissions: string[];
  dependencies?: string[];
  binaries?: string[];
}

interface SkillEntry {
  id: string;
  metadata: OpenClawSkillMetadata;
  path: string;
  enabled: boolean;
}

7.2 技能加载

加载流程

export async function loadWorkspaceSkillEntries(
  workspaceDir: string
): Promise<SkillEntry[]> {
  // 1. 扫描技能目录
  const skillDirs = await scanSkillDirectories(workspaceDir);

  // 2. 加载技能元数据
  const skills: SkillEntry[] = [];
  for (const skillDir of skillDirs) {
    const metadata = await loadSkillMetadata(skillDir);
    if (!metadata) continue;

    // 3. 检查依赖
    const depsMet = await checkSkillDependencies(metadata);
    if (!depsMet) continue;

    // 4. 检查二进制文件
    const binariesAvailable = await checkSkillBinaries(metadata);
    if (!binariesAvailable) continue;

    skills.push({
      id: metadata.id,
      metadata,
      path: skillDir,
      enabled: true,
    });
  }

  return skills;
}

7.3 技能提示词

提示词构建

export async function buildWorkspaceSkillsPrompt(
  workspaceDir: string
): Promise<string | undefined> {
  // 1. 加载技能
  const skills = await loadWorkspaceSkillEntries(workspaceDir);

  if (skills.length === 0) {
    return undefined;
  }

  // 2. 过滤启用技能
  const enabledSkills = skills.filter(s => s.enabled);

  // 3. 构建提示词
  const sections: string[] = [
    '# Available Skills',
    '',
    'You have access to the following skills:',
    '',
  ];

  for (const skill of enabledSkills) {
    sections.push(`## ${skill.metadata.name}`);
    sections.push(skill.metadata.description);

    if (skill.metadata.permissions?.length > 0) {
      sections.push('');
      sections.push(`**Permissions:** ${skill.metadata.permissions.join(', ')}`);
    }

    sections.push('');
  }

  return sections.join('\n');
}

8. 记忆与上下文

8.1 记忆搜索配置

配置结构

export type ResolvedMemorySearchConfig = {
  enabled: boolean;
  sources: Array<"memory" | "sessions">;
  provider: "openai" | "local" | "gemini" | "voyage" | "mistral" | "ollama" | "auto";
  model: string;
  chunking: {
    tokens: number;
    overlap: number;
  };
  query: {
    maxResults: number;
    minScore: number;
    hybridAlpha: number;
  };
  store: {
    driver: "sqlite";
    path: string;
    vector: {
      enabled: boolean;
      extensionPath?: string;
    };
  };
  sync: {
    onSessionStart: boolean;
    onSearch: boolean;
    watch: boolean;
  };
};

8.2 记忆搜索流程

搜索执行

export async function searchMemories(
  query: string,
  options: SearchOptions
): Promise<MemorySearchResult[]> {
  // 1. 查询扩展
  const expandedQueries = await expandQuery(query, options);

  // 2. 向量搜索
  const vectorResults = await searchVector({
    queries: expandedQueries,
    limit: options.maxResults,
  });

  // 3. 关键词搜索
  const keywordResults = await searchKeyword({
    queries: expandedQueries,
    limit: options.maxResults,
  });

  // 4. 混合结果
  const hybridResults = mergeHybridResults({
    vector: vectorResults,
    keyword: keywordResults,
    alpha: options.hybridAlpha || 0.7,
  });

  // 5. 去重和排序
  return deduplicateAndRank(hybridResults);
}

8.3 记忆同步

同步策略

export async function syncSessionToMemory(
  session: Session,
  options: SyncOptions
): Promise<void> {
  // 1. 提取对话历史
  const turns = session.history.filter(
    m => m.role === 'user' || m.role === 'assistant'
  );

  // 2. 分块
  const chunks = await chunkConversation(turns, {
    maxTokens: options.chunking.tokens,
    overlap: options.chunking.overlap,
  });

  // 3. 向量化
  const embeddings = await embedChunks(chunks, options);

  // 4. 存储到向量数据库
  await storeEmbeddings(embeddings, options);

  // 5. 更新索引
  await updateMemoryIndex(session.id, options);
}

9. 错误处理与容错

9.1 错误分类

错误类型层次

Error
├── AgentError
│   ├── AuthError
│   ├── RateLimitError
│   ├── ContextOverflowError
│   ├── CompactionFailureError
│   └── TimeoutError
├── ToolError
│   ├── ToolNotFoundError
│   ├── ToolPermissionError
│   └── ToolExecutionError
└── SystemError
    ├── ConfigurationError
    ├── DependencyError
    └── FileSystemError

9.2 重试策略

重试配置

interface RetryConfig {
  maxAttempts: number;
  initialDelayMs: number;
  maxDelayMs: number;
  backoffFactor: number;
  jitter: number;
  retryableErrors: string[];
}

const DEFAULT_RETRY_CONFIG: RetryConfig = {
  maxAttempts: 3,
  initialDelayMs: 1000,
  maxDelayMs: 30000,
  backoffFactor: 2,
  jitter: 0.2,
  retryableErrors: [
    'ECONNRESET',
    'ETIMEDOUT',
    'ENOTFOUND',
    'ECONNREFUSED',
  ],
};

export async function retryAsync<T>(
  fn: () => Promise<T>,
  config: RetryConfig = DEFAULT_RETRY_CONFIG
): Promise<T> {
  let lastError: Error | undefined;

  for (let attempt = 0; attempt < config.maxAttempts; attempt++) {
    try {
      return await fn();
    } catch (error) {
      lastError = error as Error;

      // 检查是否可重试
      if (!isRetryableError(error, config)) {
        throw error;
      }

      // 计算延迟
      const delay = calculateRetryDelay(attempt, config);
      await sleep(delay);
    }
  }

  throw lastError;
}

9.3 断路器模式

断路器实现

class CircuitBreaker {
  private failures: number = 0;
  private lastFailureTime: number = 0;
  private state: 'closed' | 'open' | 'half-open' = 'closed';

  constructor(
    private threshold: number = 5,
    private timeoutMs: number = 60000
  ) {}

  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (this.state === 'open') {
      if (this.shouldAttemptReset()) {
        this.state = 'half-open';
      } else {
        throw new Error('Circuit breaker is open');
      }
    }

    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  private onSuccess(): void {
    this.failures = 0;
    this.state = 'closed';
  }

  private onFailure(): void {
    this.failures++;
    this.lastFailureTime = Date.now();

    if (this.failures >= this.threshold) {
      this.state = 'open';
    }
  }

  private shouldAttemptReset(): boolean {
    return Date.now() - this.lastFailureTime > this.timeoutMs;
  }
}

10. 性能优化

10.1 缓存策略

多级缓存

class MultiLevelCache {
  private memoryCache: Map<string, CacheEntry>;
  private diskCache: DiskCache;

  async get<T>(key: string): Promise<T | undefined> {
    // 1. 内存缓存
    const memEntry = this.memoryCache.get(key);
    if (memEntry && !memEntry.expired) {
      return memEntry.value as T;
    }

    // 2. 磁盘缓存
    const diskEntry = await this.diskCache.get<T>(key);
    if (diskEntry) {
      // 回填内存缓存
      this.memoryCache.set(key, {
        value: diskEntry,
        expires: Date.now() + 3600000, // 1 小时
      });
      return diskEntry;
    }

    return undefined;
  }

  async set<T>(key: string, value: T, ttl: number): Promise<void> {
    // 写入内存缓存
    this.memoryCache.set(key, {
      value,
      expires: Date.now() + ttl,
    });

    // 写入磁盘缓存
    await this.diskCache.set(key, value, ttl);
  }
}

10.2 流式处理优化

流式优化

export async function processStream<T, R>(
  stream: AsyncIterable<T>,
  processor: (item: T) => Promise<R>,
  options: StreamOptions
): Promise<R[]> {
  const results: R[] = [];
  const concurrent = options.concurrent || 1;
  const queue: Promise<R>[] = [];

  for await (const item of stream) {
    // 添加到队列
    const promise = processor(item);
    queue.push(promise);

    // 限制并发
    if (queue.length >= concurrent) {
      const result = await Promise.race(queue);
      results.push(result);
      queue.splice(queue.indexOf(promise), 1);
    }
  }

  // 等待剩余任务
  const remaining = await Promise.all(queue);
  results.push(...remaining);

  return results;
}

10.3 Token 估算优化

快速估算

class TokenEstimator {
  private cache: Map<string, number>;

  constructor() {
    this.cache = new Map();
  }

  estimate(text: string): number {
    // 1. 检查缓存
    const cached = this.cache.get(text);
    if (cached !== undefined) {
      return cached;
    }

    // 2. 快速估算(基于字符数)
    const fastEstimate = Math.ceil(text.length / 4);

    // 3. 更新缓存
    if (text.length < 1000) {
      this.cache.set(text, fastEstimate);
    }

    return fastEstimate;
  }

  async estimateAccurate(text: string): Promise<number> {
    // 1. 检查缓存
    const cached = this.cache.get(text);
    if (cached !== undefined) {
      return cached;
    }

    // 2. 使用 PI 库精确估算
    const accurate = await estimateTokens(text);

    // 3. 更新缓存
    this.cache.set(text, accurate);

    return accurate;
  }
}

10.4 批处理优化

批量向量化

export async function batchEmbed<T>(
  items: T[],
  embedFn: (item: T) => Promise<number[]>,
  options: BatchOptions
): Promise<number[][]> {
  const { batchSize = 10, concurrency = 3 } = options;
  const results: number[][] = [];

  // 分批处理
  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize);

    // 并发处理批次
    const batchPromises = batch.map(item => embedFn(item));
    const batchResults = await Promise.all(batchPromises);

    results.push(...batchResults);

    // 限制并发批次
    if ((i / batchSize + 1) % concurrency === 0) {
      await sleep(100); // 避免速率限制
    }
  }

  return results;
}

总结

src/agents/ 模块是 OpenClaw 的核心,实现了完整的 AI 智能体功能。主要特点包括:

核心优势

  1. 模块化设计:清晰的职责分离,易于维护和扩展
  2. 高性能:流式处理、缓存优化、批量处理
  3. 容错性强:完善的错误处理和重试机制
  4. 可扩展:插件化的工具和技能系统
  5. 安全可靠:权限控制和策略执行

技术亮点

  • PI Agent 引擎:基于成熟的 AI 框架
  • 智能压缩:自动优化上下文使用
  • 故障转移:多认证配置文件自动切换
  • 工具系统:90+ 内置工具,易于扩展
  • 记忆系统:向量搜索和语义检索

学习建议

  1. 从主流程入手:理解 runEmbeddedPiAgent 的完整流程
  2. 深入关键组件:学习压缩、故障转移、工具执行
  3. 实践扩展:尝试添加自定义工具和技能
  4. 阅读测试:通过测试用例理解边界情况

希望这份详细的源码解读能帮助您深入理解 OpenClaw 的智能体实现!🎉

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐