OpenClaw src/agents 详细源码解读
是 OpenClaw 的核心智能体模块,负责:1.3 设计原则分层架构:清晰的职责分离插件化:工具和技能可扩展流式处理:支持实时响应容错设计:完善的错误处理和重试机制性能优化:智能的上下文压缩和缓存2. 目录结构3. 核心组件详解3.1 PI 嵌入式运行器(pi-embedded-runner)3.1.1 架构概述PI 嵌入式运行器是智能体的核心执行引擎,负责:3.1.2 主运行入口文件:核心函数
src/agents 详细源码解读
OpenClaw AI 智能体核心模块深度解析
📋 目录
1. 模块概览
1.1 模块定位
src/agents/ 是 OpenClaw 的核心智能体模块,负责:
- 🤖 AI 推理引擎:基于 PI Agent 引擎实现对话和任务执行
- 🔧 工具管理:定义、注册和执行工具
- 🧠 上下文管理:对话历史的压缩和优化
- 💾 记忆集成:向量搜索和语义检索
- 🔌 技能系统:可插拔的功能模块
- 🛡️ 安全控制:权限验证和策略执行
1.2 核心依赖
// 核心依赖包
@mariozechner/pi-agent-core // PI Agent 核心库
@mariozechner/pi-coding-agent // PI 编码智能体
@mariozechner/pi-ai // PI AI 基础库
@mariozechner/pi-tui // PI 终端 UI
1.3 设计原则
- 分层架构:清晰的职责分离
- 插件化:工具和技能可扩展
- 流式处理:支持实时响应
- 容错设计:完善的错误处理和重试机制
- 性能优化:智能的上下文压缩和缓存
2. 目录结构
src/agents/
├── pi-embedded-runner/ # PI 嵌入式运行器(核心)
│ ├── run/ # 运行逻辑
│ │ ├── attempt.ts # 单次尝试执行
│ │ ├── params.ts # 参数定义
│ │ └── payloads.ts # 载荷构建
│ ├── compact.ts # 上下文压缩
│ ├── model.ts # 模型解析
│ ├── system-prompt.ts # 系统提示词
│ ├── tool-split.ts # 工具拆分
│ ├── tool-result-truncation.ts # 工具结果截断
│ ├── types.ts # 类型定义
│ └── run.ts # 主运行入口
├── tools/ # 工具系统(90+ 工具)
│ ├── browser-tool.ts # 浏览器工具
│ ├── web-search.ts # 网络搜索
│ ├── memory-tool.ts # 记忆检索
│ ├── sessions-* # 会话管理工具
│ ├── [platform]-actions.ts # 平台操作工具
│ └── ...
├── skills/ # 技能系统
│ ├── config.js # 技能配置
│ ├── workspace.js # 工作区技能
│ └── types.js # 类型定义
├── subagent-* # 子智能体管理
├── auth-profiles.ts # 认证配置文件
├── compaction.ts # 上下文压缩
├── context.ts # 上下文管理
├── context-window-guard.ts # 上下文窗口保护
├── failover-error.ts # 故障转移
├── memory-search.ts # 记忆搜索
├── model-auth.ts # 模型认证
├── model-selection.ts # 模型选择
├── pi-tools.ts # PI 工具创建
├── pi-embedded-subscribe.ts # PI 订阅处理
├── sandbox.ts # 沙箱环境
├── session-* # 会话管理
└── workspace-* # 工作区管理
3. 核心组件详解
3.1 PI 嵌入式运行器(pi-embedded-runner)
3.1.1 架构概述
PI 嵌入式运行器是智能体的核心执行引擎,负责:
用户请求
↓
[运行入口] runEmbeddedPiAgent()
↓
[参数解析] resolveModel(), resolveExtraParams()
↓
[钩子触发] before_agent_start, before_prompt_build
↓
[载荷构建] buildEmbeddedRunPayloads()
↓
[PI 执行] runEmbeddedAttempt()
↓
[流式订阅] subscribeEmbeddedPiSession()
↓
[工具执行] executeTools()
↓
[上下文压缩] compactEmbeddedPiSession()
↓
[结果返回] EmbeddedPiRunResult
3.1.2 主运行入口
文件:src/agents/pi-embedded-runner/run.ts
核心函数:
export async function runEmbeddedPiAgent(
options: RunEmbeddedPiAgentParams
): Promise<EmbeddedPiRunResult> {
// 1. 解析模型和认证
const model = await resolveModel(options);
const auth = await resolveModelAuth(model);
// 2. 触发 before_agent_start 钩子
const hookResult = await getGlobalHookRunner().emitHook(
'agent:before_agent_start',
{ agentId: options.agentId, model, config: options.config }
);
// 3. 构建运行载荷
const payloads = await buildEmbeddedRunPayloads({
...options,
model,
auth,
hookResult,
});
// 4. 执行 PI Agent
const result = await runEmbeddedAttempt({
...options,
model,
auth,
payloads,
});
// 5. 处理工具结果截断
if (sessionLikelyHasOversizedToolResults(result.session)) {
await truncateOversizedToolResultsInSession(result.session);
}
// 6. 触发 after_agent_end 钩子
await getGlobalHookRunner().emitHook(
'agent:after_agent_end',
{ result, options }
);
return result;
}
关键特性:
- 故障转移:支持多模型和认证配置文件切换
- 钩子集成:在关键生命周期点触发插件钩子
- 载荷优化:智能构建最小化的 API 载荷
- 结果处理:自动截断过大的工具结果
3.1.3 单次尝试执行
文件:src/agents/pi-embedded-runner/run/attempt.ts
核心逻辑:
export async function runEmbeddedAttempt(
params: RunEmbeddedAttemptParams
): Promise<EmbeddedPiRunResult> {
// 1. 创建会话管理器
const sessionManager = await createAgentSession({
apiKey: params.auth.apiKey,
baseUrl: params.auth.baseUrl,
model: params.model,
sessionFile: params.sessionFile,
resourceLoader: createResourceLoader(params),
});
// 2. 保护会话管理器(添加工具结果守卫)
const guardedSessionManager = guardSessionManager(
sessionManager,
params.toolResultGuardConfig
);
// 3. 构建流式函数
const streamFn = createStreamFn(params);
// 4. 订阅 PI 会话
const subscription = await subscribeEmbeddedPiSession({
sessionManager: guardedSessionManager,
streamFn,
onBlockReply: params.onBlockReply,
onToolCall: params.onToolCall,
onMessage: params.onMessage,
});
// 5. 等待完成
await subscription.waitForEnd();
// 6. 返回结果
return {
payloads: subscription.payloads,
meta: subscription.meta,
didSendViaMessagingTool: subscription.didSendViaMessagingTool,
};
}
关键实现细节:
流式函数创建
function createStreamFn(params: RunEmbeddedAttemptParams): StreamFn {
const provider = normalizeProviderId(params.model.provider);
// OpenAI WebSocket 流式
if (isOpenAIProvider(provider)) {
return createOpenAIWebSocketStreamFn({
apiKey: params.auth.apiKey,
baseUrl: params.auth.baseUrl,
model: params.model.id,
});
}
// Ollama 本地流式
if (provider === 'ollama') {
return createConfiguredOllamaStreamFn({
baseUrl: params.auth.baseUrl,
model: params.model.id,
});
}
// 默认 HTTP 流式
return streamSimple({
apiKey: params.auth.apiKey,
baseUrl: params.auth.baseUrl,
model: params.model.id,
});
}
资源加载器
function createResourceLoader(
params: RunEmbeddedAttemptParams
): DefaultResourceLoader {
return new DefaultResourceLoader({
// 加载 Bootstrap 文件
loadBootstrap: async () => {
return resolveBootstrapContextForRun(params);
},
// 加载技能
loadSkills: async () => {
return buildWorkspaceSkillsPrompt(params.workspaceDir);
},
// 加载工具定义
loadTools: async () => {
return toClientToolDefinitions(params.tools);
},
});
}
3.1.4 上下文压缩
文件:src/agents/pi-embedded-runner/compact.ts
压缩策略:
export async function compactEmbeddedPiSession(
session: Session,
options: CompactOptions
): Promise<EmbeddedPiCompactResult> {
// 1. 估算当前 token 使用
const tokensBefore = estimateMessagesTokens(session.history);
// 2. 检查是否需要压缩
const contextWindow = resolveContextWindowInfo(session.model);
const shouldCompact = evaluateContextWindowGuard({
tokens: tokensBefore,
window: contextWindow.tokens,
safetyMargin: options.safetyMargin || 1.2,
});
if (!shouldCompact) {
return { ok: true, compacted: false };
}
// 3. 识别保留部分
const keepSections = identifyKeepSections(session.history, {
keepRecent: options.recentTurnsToKeep || 2,
keepUserQuestions: true,
keepToolCalls: true,
keepSystemMessages: false,
});
// 4. 生成摘要
const summary = await generateSummary({
history: session.history,
exclude: keepSections,
instructions: buildCompactionSummarizationInstructions(
options.customInstructions,
options.identifierPolicy
),
});
// 5. 构建压缩后的历史
const compactedHistory = [
...keepSections,
{
role: 'system',
content: `以下是对话历史的摘要:\n${summary}`,
},
];
// 6. 更新会话
session.history = compactedHistory;
session.compactionCount = (session.compactionCount || 0) + 1;
const tokensAfter = estimateMessagesTokens(compactedHistory);
return {
ok: true,
compacted: true,
result: {
summary,
firstKeptEntryId: keepSections[0]?.id,
tokensBefore,
tokensAfter,
},
};
}
压缩算法:
- Token 估算:使用
@mariozechner/pi-coding-agent的estimateTokens() - 保留策略:
- 保留最近的 N 轮对话(默认 2 轮)
- 保留用户问题
- 保留工具调用
- 移除系统消息
- 摘要生成:
- 使用 LLM 生成摘要
- 保留关键信息:任务状态、决策、TODO、承诺
- 支持自定义指令
- 标识符保护:
- 严格模式:保留所有 UUID、哈希、ID、令牌等
- 自定义模式:使用用户提供的指令
- 关闭模式:不保护标识符
3.1.5 系统提示词
文件:src/agents/pi-embedded-runner/system-prompt.ts
提示词结构:
export async function createSystemPromptOverride(
params: SystemPromptParams
): Promise<string> {
const sections: string[] = [];
// 1. 基础系统提示词
if (params.baseSystemPrompt) {
sections.push(params.baseSystemPrompt);
}
// 2. 技能提示词
if (params.skillsPrompt) {
sections.push(params.skillsPrompt);
}
// 3. 工具使用指南
if (params.tools.length > 0) {
sections.push(buildToolsUsageGuide(params.tools));
}
// 4. 上下文窗口信息
sections.push(buildContextWindowInfo(params.model));
// 5. 沙箱环境信息
if (params.sandbox.enabled) {
sections.push(buildSandboxInfo(params.sandbox));
}
// 6. 平台特定提示词
if (params.platformHints) {
sections.push(params.platformHints);
}
// 7. 自定义指令
if (params.customInstructions) {
sections.push(params.customInstructions);
}
return sections.join('\n\n');
}
提示词优化:
- 动态构建:根据运行时状态动态组装
- 分层结构:清晰的章节划分
- 上下文感知:包含环境、工具、约束等信息
- Token 优化:自动去除冗余内容
3.1.6 工具拆分
文件:src/agents/pi-embedded-runner/tool-split.ts
拆分策略:
export function splitSdkTools(
tools: AnyAgentTool[],
options: SplitOptions
): {
sdkTools: AgentTool[];
customTools: AgentTool[];
} {
const sdkTools: AgentTool[] = [];
const customTools: AgentTool[] = [];
for (const tool of tools) {
// SDK 内置工具(file_browser, bash, python_repl 等)
if (isSdkTool(tool)) {
sdkTools.push(toSdkTool(tool));
}
// OpenClaw 自定义工具(web_search, memory, browser 等)
else {
customTools.push(toCustomTool(tool));
}
}
return { sdkTools, customTools };
}
function isSdkTool(tool: AnyAgentTool): boolean {
const sdkToolNames = new Set([
'file_browser',
'bash',
'python_repl',
'text_editor',
]);
return sdkToolNames.has(tool.name);
}
拆分原因:
- 兼容性:不同 LLM 提供商对工具的支持不同
- 性能:减少不必要的工具传输
- 安全性:分离高风险工具
- 灵活性:便于工具管理
3.1.7 工具结果截断
文件:src/agents/pi-embedded-runner/tool-result-truncation.ts
截断策略:
export function truncateOversizedToolResultsInSession(
session: Session,
options: TruncationOptions
): void {
const maxChars = options.maxChars || 100_000;
for (const message of session.history) {
if (message.role !== 'tool_result') continue;
const result = message.content as ToolResult;
if (!result.content) continue;
// 估算字符数
const estimatedChars = estimateToolResultChars(result);
if (estimatedChars > maxChars) {
// 头尾截断
const headChars = Math.floor(maxChars / 2);
const tailChars = maxChars - headChars;
const originalContent = result.content;
result.content = [
originalContent.slice(0, headChars),
`\n... [截断 ${estimatedChars - maxChars} 字符] ...\n`,
originalContent.slice(-tailChars),
].join('');
// 记录截断
result.truncated = true;
result.originalSize = estimatedChars;
}
}
}
function estimateToolResultChars(result: ToolResult): number {
if (typeof result.content === 'string') {
return result.content.length;
}
if (typeof result.content === 'object') {
return JSON.stringify(result.content).length;
}
return 0;
}
截断策略:
- 阈值检测:检测超过阈值的工具结果
- 头尾保留:保留结果的开头和结尾
- 截断标记:添加截断提示信息
- 元数据记录:记录原始大小和截断状态
3.2 上下文管理(context.ts)
3.2.1 上下文窗口管理
核心功能:
export async function resolveContextWindowInfo(
model: ModelReference
): Promise<ContextWindowInfo> {
// 1. 查找模型上下文窗口
const contextWindow = await findModelContextWindow(model.id);
// 2. 应用配置覆盖
const configuredWindow = getConfiguredContextWindow(model.id);
const effectiveWindow = configuredWindow || contextWindow;
// 3. 应用安全边际
const usableWindow = Math.floor(
effectiveWindow * SAFETY_MARGIN
);
return {
tokens: effectiveWindow,
usableTokens: usableWindow,
minTokens: CONTEXT_WINDOW_HARD_MIN_TOKENS,
warnThreshold: CONTEXT_WINDOW_WARN_BELOW_TOKENS,
};
}
上下文窗口发现:
async function findModelContextWindow(
modelId: string
): Promise<number> {
// 1. 检查缓存
if (MODEL_CACHE.has(modelId)) {
return MODEL_CACHE.get(modelId)!;
}
// 2. 从配置文件加载
const modelsConfig = await loadModelsConfig();
const configuredWindow = modelsConfig[modelId]?.contextWindow;
// 3. 从提供商 API 发现
if (!configuredWindow) {
const discoveredWindow = await discoverFromProvider(modelId);
if (discoveredWindow) {
MODEL_CACHE.set(modelId, discoveredWindow);
return discoveredWindow;
}
}
// 4. 使用默认值
return DEFAULT_CONTEXT_TOKENS;
}
支持的模型:
| 提供商 | 模型 | 上下文窗口 |
|---|---|---|
| Anthropic | Claude 3 Opus | 200K |
| Anthropic | Claude 3.5 Sonnet | 200K |
| OpenAI | GPT-4 Turbo | 128K |
| OpenAI | GPT-4o | 128K |
| Gemini 1.5 Pro | 1M | |
| Gemini 1.5 Flash | 1M |
3.2.2 上下文窗口保护
文件:src/agents/context-window-guard.ts
保护机制:
export function evaluateContextWindowGuard(
params: ContextWindowGuardParams
): ContextWindowGuardResult {
const { tokens, window, safetyMargin } = params;
const usableWindow = Math.floor(window * safetyMargin);
// 1. 硬性限制检查
if (tokens > window) {
return {
status: 'overflow',
action: 'must_compact',
message: `Token count (${tokens}) exceeds context window (${window})`,
};
}
// 2. 警告阈值检查
if (tokens > usableWindow) {
return {
status: 'warning',
action: 'should_compact',
message: `Token count (${tokens}) approaches usable window (${usableWindow})`,
};
}
// 3. 正常状态
return {
status: 'ok',
action: 'none',
message: 'Token count within safe limits',
};
}
保护级别:
- 硬性限制:超过上下文窗口,必须压缩
- 警告阈值:接近可用窗口,建议压缩
- 安全边际:留出 20% 缓冲空间
3.3 故障转移(failover-error.ts)
3.3.1 错误分类
错误类型:
export type FailoverReason =
| 'auth' // 认证失败
| 'rate_limit' // 速率限制
| 'context_overflow' // 上下文溢出
| 'compaction_failure' // 压缩失败
| 'timeout' // 超时
| 'server_error' // 服务器错误
| 'billing' // 计费问题
| 'model_unavailable' // 模型不可用
| 'unknown'; // 未知错误
export function classifyFailoverReason(
error: Error
): FailoverReason {
// 1. 认证错误
if (isAuthAssistantError(error)) {
return 'auth';
}
// 2. 速率限制
if (isRateLimitAssistantError(error)) {
return 'rate_limit';
}
// 3. 上下文溢出
if (isLikelyContextOverflowError(error)) {
return 'context_overflow';
}
// 4. 压缩失败
if (isCompactionFailureError(error)) {
return 'compaction_failure';
}
// 5. 超时
if (isTimeoutErrorMessage(error)) {
return 'timeout';
}
// 6. 计费问题
if (isBillingAssistantError(error)) {
return 'billing';
}
// 7. 服务器错误
if (isFailoverErrorMessage(error)) {
return 'server_error';
}
// 8. 未知错误
return 'unknown';
}
3.3.2 故障转移策略
配置文件认证:
export async function resolveAuthProfileOrder(
model: ModelReference,
config: OpenClawConfig
): Promise<ResolvedAuthProfile[]> {
const profiles = config.agents?.defaults?.authProfiles;
// 1. 获取配置的认证配置文件
const configuredProfiles = profiles
?.filter(p => p.model === model.id || p.model === '*')
.sort((a, b) => (a.order || 0) - (b.order || 0));
// 2. 应用冷却策略
const availableProfiles = configuredProfiles.filter(
p => !isProfileInCooldown(p.id)
);
// 3. 返回可用配置文件
return availableProfiles.map(p => ({
id: p.id,
apiKey: p.apiKey,
baseUrl: p.baseUrl,
order: p.order || 0,
}));
}
故障转移执行:
export async function resolveFailoverStatus(
error: Error,
params: FailoverParams
): Promise<FailoverStatus> {
const reason = classifyFailoverReason(error);
// 1. 标记失败
if (params.currentProfile) {
await markAuthProfileFailure(params.currentProfile.id, {
reason,
timestamp: Date.now(),
});
}
// 2. 查找下一个可用配置文件
const nextProfile = await findNextAvailableProfile({
model: params.model,
excludeProfileId: params.currentProfile?.id,
});
if (nextProfile) {
return {
shouldRetry: true,
nextProfile,
reason,
backoffMs: calculateBackoffMs(reason),
};
}
// 3. 没有可用的配置文件
return {
shouldRetry: false,
reason,
message: 'No available auth profiles',
};
}
function calculateBackoffMs(reason: FailoverReason): number {
const backoffPolicies: Record<FailoverReason, BackoffPolicy> = {
auth: { initialMs: 1000, maxMs: 30000, factor: 2 },
rate_limit: { initialMs: 5000, maxMs: 60000, factor: 2 },
context_overflow: { initialMs: 100, maxMs: 1000, factor: 1.5 },
timeout: { initialMs: 2000, maxMs: 30000, factor: 2 },
server_error: { initialMs: 1000, maxMs: 10000, factor: 1.5 },
billing: { initialMs: 0, maxMs: 0, factor: 1 },
model_unavailable: { initialMs: 0, maxMs: 0, factor: 1 },
unknown: { initialMs: 2000, maxMs: 30000, factor: 2 },
};
const policy = backoffPolicies[reason] || backoffPolicies.unknown;
return computeBackoff(policy);
}
故障转移特性:
- 智能分类:准确识别错误类型
- 冷却策略:失败的配置文件暂时禁用
- 回退算法:指数退避 + 随机抖动
- 自动恢复:冷却期后自动恢复配置文件
3.4 模型认证(model-auth.ts)
3.4.1 认证模式
认证类型:
export type ModelAuthMode =
| 'api_key' // API 密钥
| 'oauth' // OAuth
| 'profile' // 认证配置文件
| 'env' // 环境变量
| 'none'; // 无认证
export async function resolveModelAuthMode(
model: ModelReference
): Promise<ModelAuthMode> {
// 1. 检查是否配置了认证配置文件
if (hasAuthProfiles(model.id)) {
return 'profile';
}
// 2. 检查是否配置了 OAuth
if (hasOAuthConfig(model.id)) {
return 'oauth';
}
// 3. 检查环境变量
if (hasEnvApiKey(model.id)) {
return 'env';
}
// 4. 检查 API 密钥
if (hasApiKey(model.id)) {
return 'api_key';
}
// 5. 无需认证
return 'none';
}
3.4.2 API 密钥解析
密钥来源:
export async function getApiKeyForModel(
model: ModelReference
): Promise<string | undefined> {
const provider = normalizeProviderId(model.provider);
// 1. 从配置文件读取
const configKey = `agents.defaults.providers.${provider}.apiKey`;
const configValue = config.get(configKey);
if (configValue) {
return resolveSecretInput(configValue);
}
// 2. 从环境变量读取
const envVar = getEnvVarForModel(model);
if (envVar) {
return process.env[envVar];
}
// 3. 从 SecretRef 解析
const secretRef = getSecretRefForModel(model);
if (secretRef) {
return await resolveSecretInput(secretRef);
}
return undefined;
}
function getEnvVarForModel(model: ModelReference): string | undefined {
const provider = normalizeProviderId(model.provider);
const envVars: Record<string, string> = {
openai: 'OPENAI_API_KEY',
anthropic: 'ANTHROPIC_API_KEY',
google: 'GOOGLE_API_KEY',
gemini: 'GEMINI_API_KEY',
mistral: 'MISTRAL_API_KEY',
// ...
};
return envVars[provider];
}
3.4.3 GitHub Copilot 认证
特殊处理:
export async function getGitHubCopilotToken(
githubToken: string
): Promise<string> {
// 1. 获取 Copilot 认证
const auth = await fetch('https://api.github.com/copilot_internal/v2/token/get', {
headers: {
'Authorization': `token ${githubToken}`,
'Accept': 'application/json',
},
});
const data = await auth.json();
// 2. 提取访问令牌
const accessToken = data.token;
// 3. 设置自动刷新
if (data.expires_at) {
scheduleTokenRefresh(accessToken, data.expires_at);
}
return accessToken;
}
function scheduleTokenRefresh(
token: string,
expiresAt: number
): void {
const refreshMargin = 5 * 60 * 1000; // 5 分钟前刷新
const refreshTime = expiresAt - refreshMargin;
const delay = Math.max(refreshTime - Date.now(), 0);
setTimeout(async () => {
try {
const newToken = await getGitHubCopilotToken(githubToken);
// 更新缓存的令牌
COPILOT_TOKEN_CACHE.set(token, newToken);
} catch (error) {
log.error('Failed to refresh Copilot token', error);
// 重试
setTimeout(() => scheduleTokenRefresh(token, expiresAt), 60000);
}
}, delay);
}
3.5 模型选择(model-selection.ts)
3.5.1 模型解析
模型标识符:
export function normalizeProviderId(
provider: string | undefined
): string {
if (!provider) return 'openai';
const normalized = provider.toLowerCase().trim();
// 标准化提供商名称
const aliases: Record<string, string> = {
'openai-codex': 'openai',
'chatgpt': 'openai',
'claude': 'anthropic',
'gemini': 'google',
'mistral-ai': 'mistral',
'qwen': 'alibaba',
// ...
};
return aliases[normalized] || normalized;
}
export function parseModelReference(
modelId: string
): ModelReference {
// 格式:provider/model 或仅 model
const parts = modelId.split('/');
if (parts.length === 2) {
return {
provider: normalizeProviderId(parts[0]),
id: parts[1],
fullName: modelId,
};
}
// 默认提供商
return {
provider: 'openai',
id: modelId,
fullName: `openai/${modelId}`,
};
}
3.5.2 默认模型选择
选择策略:
export async function resolveDefaultModelForAgent(
agentId: string,
config: OpenClawConfig
): Promise<ModelReference> {
// 1. 检查 Agent 特定配置
const agentConfig = resolveAgentConfig(agentId, config);
if (agentConfig.model) {
return parseModelReference(agentConfig.model);
}
// 2. 检查全局默认配置
const globalDefault = config.agents?.defaults?.model;
if (globalDefault) {
return parseModelReference(globalDefault);
}
// 3. 使用内置默认值
return {
provider: DEFAULT_PROVIDER,
id: DEFAULT_MODEL,
fullName: `${DEFAULT_PROVIDER}/${DEFAULT_MODEL}`,
};
}
3.5.3 模型能力检测
功能支持:
export async function supportsModelTools(
model: ModelReference
): Promise<boolean> {
// 1. 检查提供商
const provider = normalizeProviderId(model.provider);
// 2. 已知支持工具的提供商
const toolSupportProviders = new Set([
'openai',
'anthropic',
'google',
'mistral',
'ollama',
]);
if (!toolSupportProviders.has(provider)) {
return false;
}
// 3. 检查特定模型
const unsupportedModels = new Set([
'gpt-3.5-turbo-instruct',
'text-davinci-003',
// ...
]);
if (unsupportedModels.has(model.id)) {
return false;
}
return true;
}
export async function supportsModelImages(
model: ModelReference
): Promise<boolean> {
// 1. 检查提供商
const provider = normalizeProviderId(model.provider);
// 2. 已知支持图像的提供商
const imageSupportProviders = new Set([
'openai',
'anthropic',
'google',
]);
if (!imageSupportProviders.has(provider)) {
return false;
}
// 3. 检查特定模型
const imageCapableModels = new Set([
'gpt-4-vision-preview',
'gpt-4o',
'claude-3-opus',
'claude-3.5-sonnet',
'gemini-1.5-pro',
'gemini-1.5-flash',
]);
return imageCapableModels.has(model.id);
}
4. 关键流程分析
4.1 完整的智能体执行流程
4.2 工具执行流程
4.3 上下文压缩流程
5. 数据流与状态管理
5.1 会话状态
会话数据结构:
interface Session {
id: string;
history: AgentMessage[];
compactionCount?: number;
createdAt: number;
updatedAt: number;
// 上下文信息
contextWindow: ContextWindowInfo;
model: ModelReference;
// 运行时状态
status: 'idle' | 'running' | 'aborted';
currentRun?: EmbeddedPiRunMeta;
// 统计信息
stats: {
totalTurns: number;
totalTokens: number;
totalCost: number;
};
}
5.2 消息流
消息类型:
type AgentMessage =
| UserMessage
| AssistantMessage
| SystemMessage
| ToolResultMessage;
interface UserMessage {
role: 'user';
content: string | ContentBlock[];
timestamp: number;
metadata?: UserMessageMetadata;
}
interface AssistantMessage {
role: 'assistant';
content: string | ContentBlock[];
timestamp: number;
toolCalls?: ToolCall[];
metadata?: AssistantMessageMetadata;
}
interface ToolResultMessage {
role: 'tool_result';
toolCallId: string;
content: string | ToolResult;
isError?: boolean;
truncated?: boolean;
}
5.3 状态持久化
会话文件格式:
{
"id": "session-uuid",
"agentId": "main",
"history": [
{
"role": "user",
"content": "用户消息",
"timestamp": 1234567890
},
{
"role": "assistant",
"content": "助手回复",
"timestamp": 1234567891
}
],
"compactionCount": 3,
"contextWindow": {
"tokens": 200000,
"usableTokens": 160000
},
"model": {
"provider": "anthropic",
"id": "claude-3.5-sonnet"
},
"stats": {
"totalTurns": 10,
"totalTokens": 15000
}
}
6. 工具系统
6.1 工具架构
工具定义:
interface AnyAgentTool {
name: string;
description: string;
parameters: ToolParameters;
handler?: ToolHandler;
permissions?: ToolPermissions;
category?: ToolCategory;
}
interface ToolParameters {
type: 'object';
properties: Record<string, ParameterDefinition>;
required: string[];
}
interface ParameterDefinition {
type: string;
description: string;
enum?: string[];
default?: unknown;
}
6.2 工具分类
90+ 工具分类:
| 类别 | 工具 | 数量 |
|---|---|---|
| 文件操作 | file_browser, text_editor, read, write |
5 |
| 执行环境 | bash, python_repl, node_exec |
3 |
| 网络工具 | web_search, web_fetch, browser |
3 |
| 会话管理 | sessions_list, sessions_history, sessions_send |
5 |
| 记忆系统 | memory_search, memory_add |
2 |
| 平台操作 | telegram_actions, slack_actions, discord_actions |
15+ |
| 系统工具 | cron_add, gateway_status, agents_list |
8 |
| 媒体处理 | image_tool, pdf_tool, tts_tool |
3 |
| 其他 | canvas, nodes, subagents |
5 |
6.3 工具创建
工具创建流程:
export function createOpenClawCodingTools(
params: CreateToolsParams
): AnyAgentTool[] {
const tools: AnyAgentTool[] = [];
// 1. 添加 SDK 内置工具
tools.push(...codingTools);
// 2. 添加文件操作工具
tools.push(
createHostWorkspaceReadTool(params),
createHostWorkspaceWriteTool(params),
createHostWorkspaceEditTool(params)
);
// 3. 添加执行工具
tools.push(
createExecTool(params.execDefaults),
createProcessTool(params.processDefaults)
);
// 4. 添加网络工具
tools.push(
createWebSearchTool(params),
createWebFetchTool(params)
);
// 5. 添加记忆工具
tools.push(
createMemorySearchTool(params.memoryConfig)
);
// 6. 添加平台工具
tools.push(...listChannelAgentTools(params));
// 7. 应用权限策略
return applyToolPolicyPipeline(tools, params);
}
6.4 工具权限
权限系统:
interface ToolPermissions {
allow?: string[];
deny?: string[];
ownerOnly?: boolean;
requireAuth?: boolean;
}
function applyToolPolicyPipeline(
tools: AnyAgentTool[],
params: ToolPolicyParams
): AnyAgentTool[] {
let filtered = tools;
// 1. 应用配置文件策略
filtered = resolveToolProfilePolicy(filtered, params.profile);
// 2. 应用所有者策略
filtered = applyOwnerOnlyToolPolicy(filtered, params.owner);
// 3. 应用群组策略
filtered = resolveGroupToolPolicy(filtered, params.groupPolicy);
// 4. 应用子智能体策略
filtered = resolveSubagentToolPolicy(filtered, params.subagentDepth);
// 5. 应用消息提供者策略
filtered = applyMessageProviderToolPolicy(filtered, params.messageProvider);
return filtered;
}
6.5 工具执行
执行流程:
async function executeTool(
toolCall: ToolCall,
context: ExecutionContext
): Promise<ToolResult> {
// 1. 查找工具
const tool = findTool(toolCall.name);
if (!tool) {
throw new Error(`Tool not found: ${toolCall.name}`);
}
// 2. 验证参数
const params = validateToolParameters(tool, toolCall.arguments);
// 3. 检查权限
const permissions = checkToolPermissions(tool, context);
if (!permissions.allowed) {
return {
isError: true,
content: `Permission denied: ${permissions.reason}`,
};
}
// 4. 触发 before_tool_call 钩子
await getGlobalHookRunner().emitHook('tool:before_tool_call', {
tool: tool.name,
params,
context,
});
// 5. 执行工具
let result: unknown;
try {
result = await tool.handler(params, context);
} catch (error) {
result = {
isError: true,
content: error.message,
};
}
// 6. 触发 after_tool_call 钩子
await getGlobalHookRunner().emitHook('tool:after_tool_call', {
tool: tool.name,
params,
result,
context,
});
// 7. 格式化结果
return formatToolResult(result);
}
7. 技能系统
7.1 技能架构
技能定义:
interface OpenClawSkillMetadata {
id: string;
name: string;
description: string;
version: string;
permissions: string[];
dependencies?: string[];
binaries?: string[];
}
interface SkillEntry {
id: string;
metadata: OpenClawSkillMetadata;
path: string;
enabled: boolean;
}
7.2 技能加载
加载流程:
export async function loadWorkspaceSkillEntries(
workspaceDir: string
): Promise<SkillEntry[]> {
// 1. 扫描技能目录
const skillDirs = await scanSkillDirectories(workspaceDir);
// 2. 加载技能元数据
const skills: SkillEntry[] = [];
for (const skillDir of skillDirs) {
const metadata = await loadSkillMetadata(skillDir);
if (!metadata) continue;
// 3. 检查依赖
const depsMet = await checkSkillDependencies(metadata);
if (!depsMet) continue;
// 4. 检查二进制文件
const binariesAvailable = await checkSkillBinaries(metadata);
if (!binariesAvailable) continue;
skills.push({
id: metadata.id,
metadata,
path: skillDir,
enabled: true,
});
}
return skills;
}
7.3 技能提示词
提示词构建:
export async function buildWorkspaceSkillsPrompt(
workspaceDir: string
): Promise<string | undefined> {
// 1. 加载技能
const skills = await loadWorkspaceSkillEntries(workspaceDir);
if (skills.length === 0) {
return undefined;
}
// 2. 过滤启用技能
const enabledSkills = skills.filter(s => s.enabled);
// 3. 构建提示词
const sections: string[] = [
'# Available Skills',
'',
'You have access to the following skills:',
'',
];
for (const skill of enabledSkills) {
sections.push(`## ${skill.metadata.name}`);
sections.push(skill.metadata.description);
if (skill.metadata.permissions?.length > 0) {
sections.push('');
sections.push(`**Permissions:** ${skill.metadata.permissions.join(', ')}`);
}
sections.push('');
}
return sections.join('\n');
}
8. 记忆与上下文
8.1 记忆搜索配置
配置结构:
export type ResolvedMemorySearchConfig = {
enabled: boolean;
sources: Array<"memory" | "sessions">;
provider: "openai" | "local" | "gemini" | "voyage" | "mistral" | "ollama" | "auto";
model: string;
chunking: {
tokens: number;
overlap: number;
};
query: {
maxResults: number;
minScore: number;
hybridAlpha: number;
};
store: {
driver: "sqlite";
path: string;
vector: {
enabled: boolean;
extensionPath?: string;
};
};
sync: {
onSessionStart: boolean;
onSearch: boolean;
watch: boolean;
};
};
8.2 记忆搜索流程
搜索执行:
export async function searchMemories(
query: string,
options: SearchOptions
): Promise<MemorySearchResult[]> {
// 1. 查询扩展
const expandedQueries = await expandQuery(query, options);
// 2. 向量搜索
const vectorResults = await searchVector({
queries: expandedQueries,
limit: options.maxResults,
});
// 3. 关键词搜索
const keywordResults = await searchKeyword({
queries: expandedQueries,
limit: options.maxResults,
});
// 4. 混合结果
const hybridResults = mergeHybridResults({
vector: vectorResults,
keyword: keywordResults,
alpha: options.hybridAlpha || 0.7,
});
// 5. 去重和排序
return deduplicateAndRank(hybridResults);
}
8.3 记忆同步
同步策略:
export async function syncSessionToMemory(
session: Session,
options: SyncOptions
): Promise<void> {
// 1. 提取对话历史
const turns = session.history.filter(
m => m.role === 'user' || m.role === 'assistant'
);
// 2. 分块
const chunks = await chunkConversation(turns, {
maxTokens: options.chunking.tokens,
overlap: options.chunking.overlap,
});
// 3. 向量化
const embeddings = await embedChunks(chunks, options);
// 4. 存储到向量数据库
await storeEmbeddings(embeddings, options);
// 5. 更新索引
await updateMemoryIndex(session.id, options);
}
9. 错误处理与容错
9.1 错误分类
错误类型层次:
Error
├── AgentError
│ ├── AuthError
│ ├── RateLimitError
│ ├── ContextOverflowError
│ ├── CompactionFailureError
│ └── TimeoutError
├── ToolError
│ ├── ToolNotFoundError
│ ├── ToolPermissionError
│ └── ToolExecutionError
└── SystemError
├── ConfigurationError
├── DependencyError
└── FileSystemError
9.2 重试策略
重试配置:
interface RetryConfig {
maxAttempts: number;
initialDelayMs: number;
maxDelayMs: number;
backoffFactor: number;
jitter: number;
retryableErrors: string[];
}
const DEFAULT_RETRY_CONFIG: RetryConfig = {
maxAttempts: 3,
initialDelayMs: 1000,
maxDelayMs: 30000,
backoffFactor: 2,
jitter: 0.2,
retryableErrors: [
'ECONNRESET',
'ETIMEDOUT',
'ENOTFOUND',
'ECONNREFUSED',
],
};
export async function retryAsync<T>(
fn: () => Promise<T>,
config: RetryConfig = DEFAULT_RETRY_CONFIG
): Promise<T> {
let lastError: Error | undefined;
for (let attempt = 0; attempt < config.maxAttempts; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error as Error;
// 检查是否可重试
if (!isRetryableError(error, config)) {
throw error;
}
// 计算延迟
const delay = calculateRetryDelay(attempt, config);
await sleep(delay);
}
}
throw lastError;
}
9.3 断路器模式
断路器实现:
class CircuitBreaker {
private failures: number = 0;
private lastFailureTime: number = 0;
private state: 'closed' | 'open' | 'half-open' = 'closed';
constructor(
private threshold: number = 5,
private timeoutMs: number = 60000
) {}
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === 'open') {
if (this.shouldAttemptReset()) {
this.state = 'half-open';
} else {
throw new Error('Circuit breaker is open');
}
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess(): void {
this.failures = 0;
this.state = 'closed';
}
private onFailure(): void {
this.failures++;
this.lastFailureTime = Date.now();
if (this.failures >= this.threshold) {
this.state = 'open';
}
}
private shouldAttemptReset(): boolean {
return Date.now() - this.lastFailureTime > this.timeoutMs;
}
}
10. 性能优化
10.1 缓存策略
多级缓存:
class MultiLevelCache {
private memoryCache: Map<string, CacheEntry>;
private diskCache: DiskCache;
async get<T>(key: string): Promise<T | undefined> {
// 1. 内存缓存
const memEntry = this.memoryCache.get(key);
if (memEntry && !memEntry.expired) {
return memEntry.value as T;
}
// 2. 磁盘缓存
const diskEntry = await this.diskCache.get<T>(key);
if (diskEntry) {
// 回填内存缓存
this.memoryCache.set(key, {
value: diskEntry,
expires: Date.now() + 3600000, // 1 小时
});
return diskEntry;
}
return undefined;
}
async set<T>(key: string, value: T, ttl: number): Promise<void> {
// 写入内存缓存
this.memoryCache.set(key, {
value,
expires: Date.now() + ttl,
});
// 写入磁盘缓存
await this.diskCache.set(key, value, ttl);
}
}
10.2 流式处理优化
流式优化:
export async function processStream<T, R>(
stream: AsyncIterable<T>,
processor: (item: T) => Promise<R>,
options: StreamOptions
): Promise<R[]> {
const results: R[] = [];
const concurrent = options.concurrent || 1;
const queue: Promise<R>[] = [];
for await (const item of stream) {
// 添加到队列
const promise = processor(item);
queue.push(promise);
// 限制并发
if (queue.length >= concurrent) {
const result = await Promise.race(queue);
results.push(result);
queue.splice(queue.indexOf(promise), 1);
}
}
// 等待剩余任务
const remaining = await Promise.all(queue);
results.push(...remaining);
return results;
}
10.3 Token 估算优化
快速估算:
class TokenEstimator {
private cache: Map<string, number>;
constructor() {
this.cache = new Map();
}
estimate(text: string): number {
// 1. 检查缓存
const cached = this.cache.get(text);
if (cached !== undefined) {
return cached;
}
// 2. 快速估算(基于字符数)
const fastEstimate = Math.ceil(text.length / 4);
// 3. 更新缓存
if (text.length < 1000) {
this.cache.set(text, fastEstimate);
}
return fastEstimate;
}
async estimateAccurate(text: string): Promise<number> {
// 1. 检查缓存
const cached = this.cache.get(text);
if (cached !== undefined) {
return cached;
}
// 2. 使用 PI 库精确估算
const accurate = await estimateTokens(text);
// 3. 更新缓存
this.cache.set(text, accurate);
return accurate;
}
}
10.4 批处理优化
批量向量化:
export async function batchEmbed<T>(
items: T[],
embedFn: (item: T) => Promise<number[]>,
options: BatchOptions
): Promise<number[][]> {
const { batchSize = 10, concurrency = 3 } = options;
const results: number[][] = [];
// 分批处理
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
// 并发处理批次
const batchPromises = batch.map(item => embedFn(item));
const batchResults = await Promise.all(batchPromises);
results.push(...batchResults);
// 限制并发批次
if ((i / batchSize + 1) % concurrency === 0) {
await sleep(100); // 避免速率限制
}
}
return results;
}
总结
src/agents/ 模块是 OpenClaw 的核心,实现了完整的 AI 智能体功能。主要特点包括:
核心优势
- 模块化设计:清晰的职责分离,易于维护和扩展
- 高性能:流式处理、缓存优化、批量处理
- 容错性强:完善的错误处理和重试机制
- 可扩展:插件化的工具和技能系统
- 安全可靠:权限控制和策略执行
技术亮点
- PI Agent 引擎:基于成熟的 AI 框架
- 智能压缩:自动优化上下文使用
- 故障转移:多认证配置文件自动切换
- 工具系统:90+ 内置工具,易于扩展
- 记忆系统:向量搜索和语义检索
学习建议
- 从主流程入手:理解
runEmbeddedPiAgent的完整流程 - 深入关键组件:学习压缩、故障转移、工具执行
- 实践扩展:尝试添加自定义工具和技能
- 阅读测试:通过测试用例理解边界情况
希望这份详细的源码解读能帮助您深入理解 OpenClaw 的智能体实现!🎉
更多推荐

所有评论(0)