本文档是对 OpenHarness 开源项目的超详细代码解读,逐行分析核心实现,深入理解其架构设计思想。

项目地址:https://github.com/HKUDS/OpenHarness
在这里插入图片描述


目录

  1. 项目概述与架构设计
  2. 核心引擎详解:QueryEngine 与 Agent Loop
  3. 消息系统:从数据结构到序列化
  4. 工具系统:43+ 工具的实现原理
  5. 查询执行流程:run_query 深度剖析
  6. ohmo 个人 Agent 系统
  7. 会话管理与记忆系统
  8. 权限控制与安全机制
  9. CLI 与命令系统
  10. 架构设计哲学总结

第一章:项目概述与架构设计

1.1 项目背景

OpenHarness 是香港大学数据智能实验室(HKUDS)开源的轻量级 AI Agent 框架。项目的核心目标是:

用纯 Python 实现,仅 11,733 行代码,复刻 Claude Code 98% 的核心工具能力。

这个目标的实现依赖于一个核心设计理念:

“The model is the agent. The code is the harness.”
(模型是 Agent,代码是 Harness)

这意味着 OpenHarness 不追求自己实现 AI 模型,而是专注于构建一个高效、可扩展、可定制的 “装备”(Harness),让各种大语言模型都能在这个装备上发挥出最大能力。

1.2 整体架构概览

OpenHarness 采用分层解耦的架构设计,整体可以分为以下几个核心层次:

在这里插入图片描述

1.3 目录结构详解

OpenHarness/
├── ohmo/                          # 【个人 Agent 应用】
│   ├── cli.py                     # 24KB,ohmo 命令行入口(typer 实现)
│   ├── runtime.py                 # 运行时封装,连接后端和 UI
│   ├── workspace.py               # 工作空间管理(人格系统核心)
│   ├── session_storage.py         # 会话持久化实现
│   ├── memory.py                  # 记忆管理命令
│   ├── prompts.py                 # 系统提示词构建
│   └── gateway/                   # IM 网关目录
│       ├── service.py             # 网关生命周期管理(~200行)
│       ├── bridge.py              # 消息总线桥接
│       ├── runtime.py             # 运行时池管理
│       ├── config.py              # 网关配置管理
│       ├── models.py              # Pydantic 数据模型
│       └── router.py              # 消息路由逻辑
│
├── src/openharness/               # 【核心框架】
│   ├── cli.py                     # 主 CLI,2423行,包含所有命令
│   ├── engine/                    # 引擎核心目录
│   │   ├── query_engine.py        # QueryEngine 类(~214行)
│   │   ├── query.py               # run_query 核心循环(~1041行)
│   │   ├── messages.py            # 消息数据结构(~222行)
│   │   ├── stream_events.py       # 流式事件定义
│   │   └── cost_tracker.py        # Token 成本追踪
│   ├── tools/                     # 43+ 工具实现
│   │   ├── base.py                # 工具抽象基类(~81行)
│   │   ├── bash_tool.py           # 命令行执行(~219行)
│   │   ├── file_edit_tool.py      # 文件编辑
│   │   ├── agent_tool.py          # 子 Agent 调用
│   │   └── ...                    # 其他 40 个工具
│   ├── api/                       # API 客户端抽象
│   ├── auth/                      # 认证管理系统
│   ├── memory/                    # 记忆系统实现
│   ├── permissions/               # 权限控制系统
│   ├── skills/                    # Markdown 技能系统
│   ├── plugins/                   # 插件系统
│   ├── mcp/                       # MCP 协议支持
│   ├── ui/                        # 终端 UI 后端
│   └── swarm/                     # 多 Agent 协调
│
└── frontend/                      # React/Ink 终端 UI(前端)

1.4 核心设计原则

原则 1:Harness 分离

OpenHarness 遵循 “Harness 分离原则”,将 AI Agent 系统明确划分为两个层次:

  1. 模型层(Agent):提供理解和推理能力(由外部 LLM 提供)
  2. 装备层(Harness):提供工具执行、上下文管理、错误处理等基础设施

这种分离的好处是:

  • 模型可替换:可以轻松切换不同的 LLM(Claude、GPT、Gemini、Kimi 等)
  • 能力可扩展:通过插件和技能系统扩展能力,无需修改核心代码
  • 行为可定制:通过配置和提示词工程定制 Agent 行为
原则 2:分层解耦

系统采用三层解耦架构:

Brain (大脑层) → Hands (双手层) → Session (记忆层)
  • Brain 层:负责与模型 API 交互,管理对话流
  • Hands 层:负责工具执行,与外部环境交互
  • Session 层:负责状态持久化,支持会话恢复

每层都可以独立失败、独立恢复、独立扩展。

原则 3:配置即代码

OpenHarness 大量使用 Markdown 文件作为配置格式,包括:

  • SOUL.md:定义 Agent 人格
  • user.md:记录用户画像
  • MEMORY.md:持久化记忆
  • SKILL.md:技能定义

这种设计的优势是:

  • 可读性强:Markdown 是人类可读的
  • 版本友好:方便用 Git 管理
  • 易于编辑:任何文本编辑器都可以修改

第二章:核心引擎详解

2.1 QueryEngine 类解析

QueryEngine 是整个 OpenHarness 最核心的类,位于 src/openharness/engine/query_engine.py。它是 Agent 的 “大脑”,负责管理与 LLM 的对话循环。

2.1.1 类定义与初始化
class QueryEngine:
    """Owns conversation history and the tool-aware model loop."""
    
    def __init__(
        self,
        *,
        api_client: SupportsStreamingMessages,    # 模型 API 客户端
        tool_registry: ToolRegistry,              # 工具注册表
        permission_checker: PermissionChecker,    # 权限检查器
        cwd: str | Path,                          # 当前工作目录
        model: str,                              # 模型标识
        system_prompt: str,                       # 系统提示词
        max_tokens: int = 4096,                   # 最大输出 token
        context_window_tokens: int | None = None, # 上下文窗口大小
        auto_compact_threshold_tokens: int | None = None,  # 自动压缩阈值
        max_turns: int | None = 8,                # 最大轮数限制
        permission_prompt: PermissionPrompt | None = None,  # 权限确认回调
        ask_user_prompt: AskUserPrompt | None = None,       # 用户询问回调
        hook_executor: HookExecutor | None = None,          # 钩子执行器
        tool_metadata: dict[str, object] | None = None,     # 工具元数据
    ) -> None:

参数详解:

  1. api_client: 实现了 SupportsStreamingMessages 接口的 API 客户端,负责与 LLM 通信。这个抽象允许接入不同的模型提供商(Anthropic、OpenAI、Kimi 等)。

  2. tool_registry: ToolRegistry 实例,管理所有可用工具。引擎通过它查找和执行工具。

  3. permission_checker: PermissionChecker 实例,负责评估工具调用的权限。可以配置哪些工具可以在哪些路径上执行。

  4. cwd: 当前工作目录,所有相对路径都基于此目录解析。

  5. model: 模型标识字符串,如 "claude-3-5-sonnet-20241022"

  6. system_prompt: 系统提示词,定义 Agent 的行为准则和能力边界。

  7. max_tokens: 每轮对话的最大输出 token 数,默认 4096。

  8. context_window_tokens: 上下文窗口大小,用于自动压缩决策。

  9. auto_compact_threshold_tokens: 自动压缩阈值,当对话超过此 token 数时触发压缩。

  10. max_turns: 每轮用户输入的最大 Agent 回合数,防止无限循环,默认 8。

  11. permission_prompt: 异步回调函数,当需要用户确认权限时调用。

  12. ask_user_prompt: 异步回调函数,当 Agent 需要向用户提问时调用。

  13. hook_executor: 钩子执行器,在关键生命周期节点执行自定义逻辑。

  14. tool_metadata: 工具元数据字典,用于在工具调用之间传递状态。

初始化逻辑:

self._api_client = api_client
self._tool_registry = tool_registry
self._permission_checker = permission_checker
self._cwd = Path(cwd).resolve()
self._model = model
self._system_prompt = system_prompt
self._max_tokens = max_tokens
self._context_window_tokens = context_window_tokens
self._auto_compact_threshold_tokens = auto_compact_threshold_tokens
self._max_turns = max_turns
self._permission_prompt = permission_prompt
self._ask_user_prompt = ask_user_prompt
self._hook_executor = hook_executor
self._tool_metadata = tool_metadata or {}
self._messages: list[ConversationMessage] = []  # 对话历史
self._cost_tracker = CostTracker()              # 成本追踪器

这里的关键设计是 _messages 列表,它保存了完整的对话历史。每次用户输入、模型响应、工具结果都会被追加到这个列表中。

2.1.2 核心属性
@property
def messages(self) -> list[ConversationMessage]:
    """Return the current conversation history."""
    return list(self._messages)

@property
def max_turns(self) -> int | None:
    """Return the maximum number of agentic turns per user input, if capped."""
    return self._max_turns

@property
def api_client(self) -> SupportsStreamingMessages:
    """Return the active API client."""
    return self._api_client

@property
def model(self) -> str:
    """Return the active model identifier."""
    return self._model

@property
def system_prompt(self) -> str:
    """Return the active system prompt."""
    return self._system_prompt

@property
def tool_metadata(self) -> dict[str, object]:
    """Return the mutable tool metadata/carry-over state."""
    return self._tool_metadata

@property
def total_usage(self):
    """Return the total usage across all turns."""
    return self._cost_tracker.total

这些属性提供了对引擎状态的只读访问。total_usage 通过 CostTracker 追踪累计的 token 消耗和成本。

2.1.3 对话管理方法
def clear(self) -> None:
    """Clear the in-memory conversation history."""
    self._messages.clear()
    self._cost_tracker = CostTracker()

clear() 方法用于重置对话状态,通常在开始新会话时调用。

def set_system_prompt(self, prompt: str) -> None:
    """Update the active system prompt for future turns."""
    self._system_prompt = prompt

def set_model(self, model: str) -> None:
    """Update the active model for future turns."""
    self._model = model

def set_api_client(self, api_client: SupportsStreamingMessages) -> None:
    """Update the active API client for future turns."""
    self._api_client = api_client

def set_max_turns(self, max_turns: int | None) -> None:
    """Update the maximum number of agentic turns per user input."""
    self._max_turns = None if max_turns is None else max(1, int(max_turns))

def set_permission_checker(self, checker: PermissionChecker) -> None:
    """Update the active permission checker for future turns."""
    self._permission_checker = checker

这些 setter 方法允许在运行时动态修改引擎配置,无需重新创建实例。

2.1.4 核心方法:submit_message
async def submit_message(
    self, prompt: str | ConversationMessage
) -> AsyncIterator[StreamEvent]:
    """Append a user message and execute the query loop."""
    # 1. 构建用户消息
    user_message = (
        prompt
        if isinstance(prompt, ConversationMessage)
        else ConversationMessage.from_user_text(prompt)
    )
    
    # 2. 记录用户目标到元数据
    if user_message.text.strip():
        remember_user_goal(self._tool_metadata, user_message.text)
    
    # 3. 追加到对话历史
    self._messages.append(user_message)
    
    # 4. 执行钩子(如果有)
    if self._hook_executor is not None:
        await self._hook_executor.execute(
            HookEvent.USER_PROMPT_SUBMIT,
            {
                "event": HookEvent.USER_PROMPT_SUBMIT.value,
                "prompt": user_message.text,
            },
        )
    
    # 5. 构建查询上下文
    context = QueryContext(
        api_client=self._api_client,
        tool_registry=self._tool_registry,
        permission_checker=self._permission_checker,
        cwd=self._cwd,
        model=self._model,
        system_prompt=self._system_prompt,
        max_tokens=self._max_tokens,
        context_window_tokens=self._context_window_tokens,
        auto_compact_threshold_tokens=self._auto_compact_threshold_tokens,
        max_turns=self._max_turns,
        permission_prompt=self._permission_prompt,
        ask_user_prompt=self._ask_user_prompt,
        hook_executor=self._hook_executor,
        tool_metadata=self._tool_metadata,
    )
    
    # 6. 准备查询消息列表
    query_messages = list(self._messages)
    
    # 7. 添加协调器上下文(如果存在)
    coordinator_context = self._build_coordinator_context_message()
    if coordinator_context is not None:
        query_messages.append(coordinator_context)
    
    # 8. 执行查询循环,流式返回事件
    async for event, usage in run_query(context, query_messages):
        if isinstance(event, AssistantTurnComplete):
            # 更新对话历史
            self._messages = list(query_messages)
        if usage is not None:
            self._cost_tracker.add(usage)
        yield event

这个方法的工作流程:

  1. 消息构建:将输入转换为 ConversationMessage 对象
  2. 目标记录:调用 remember_user_goal() 记录用户意图
  3. 历史更新:将用户消息追加到 _messages
  4. 钩子执行:触发 USER_PROMPT_SUBMIT 钩子
  5. 上下文构建:创建 QueryContext 对象传递所有依赖
  6. 消息准备:复制消息列表,避免修改原始历史
  7. 协调器上下文:添加多 Agent 协调所需的上下文
  8. 查询执行:调用 run_query() 进入核心循环,流式返回事件
2.1.5 继续中断的对话:continue_pending
async def continue_pending(
    self, *, max_turns: int | None = None
) -> AsyncIterator[StreamEvent]:
    """Continue an interrupted tool loop without appending a new user message."""
    context = QueryContext(...)
    async for event, usage in run_query(context, self._messages):
        if usage is not None:
            self._cost_tracker.add(usage)
        yield event

这个方法用于恢复被中断的对话,比如用户中途取消了操作,或者发生了网络错误。它会从当前状态继续,而不是重新开始。

2.1.6 检查待处理的工具调用
def has_pending_continuation(self) -> bool:
    """Return True when the conversation ends with tool results awaiting a follow-up model turn."""
    if not self._messages:
        return False
    last = self._messages[-1]
    if last.role != "user":
        return False
    if not any(isinstance(block, ToolResultBlock) for block in last.content):
        return False
    for msg in reversed(self._messages[:-1]):
        if msg.role != "assistant":
            continue
        return bool(msg.tool_uses)
    return False

这个方法检查对话是否处于 “待继续” 状态:

  • 最后一条消息是用户角色
  • 包含工具结果块
  • 之前的助手消息包含工具调用

这种状态发生在:助手调用了工具,工具执行完成并返回结果,但助手还没有对这些结果做出响应。


第三章:消息系统详解

3.1 消息模型设计

src/openharness/engine/messages.py 定义了 OpenHarness 的消息系统。这个模块使用 Pydantic 来定义强类型的消息模型。

3.1.1 TextBlock:文本内容块
class TextBlock(BaseModel):
    """Plain text content."""
    type: Literal["text"] = "text"
    text: str

最简单的内容块,包含纯文本。Literal["text"] 类型用于在联合类型中进行区分。

3.1.2 ImageBlock:图像内容块
class ImageBlock(BaseModel):
    """Image content encoded inline for multimodal providers."""
    type: Literal["image"] = "image"
    media_type: str
    data: str  # base64 编码的图像数据
    source_path: str = ""

    @classmethod
    def from_path(cls, path: str | Path) -> "ImageBlock":
        """Load a local image file into a base64-backed content block."""
        resolved = Path(path).expanduser().resolve()
        media_type, _ = mimetypes.guess_type(str(resolved))
        if not media_type or not media_type.startswith("image/"):
            raise ValueError(f"Unsupported image attachment: {resolved}")
        payload = base64.b64encode(resolved.read_bytes()).decode("ascii")
        return cls(media_type=media_type, data=payload, source_path=str(resolved))

ImageBlock 支持多模态模型:

  • media_type: MIME 类型,如 image/pngimage/jpeg
  • data: base64 编码的图像数据
  • source_path: 原始文件路径(可选)

from_path 类方法方便从文件系统加载图像。

3.1.3 ToolUseBlock:工具调用块
class ToolUseBlock(BaseModel):
    """A request from the model to execute a named tool."""
    type: Literal["tool_use"] = "tool_use"
    id: str = Field(default_factory=lambda: f"toolu_{uuid4().hex}")
    name: str
    input: dict[str, Any] = Field(default_factory=dict)

当模型决定调用工具时,会生成 ToolUseBlock

  • id: 唯一标识符,格式为 toolu_<16位十六进制>
  • name: 工具名称
  • input: 工具参数,JSON 对象
3.1.4 ToolResultBlock:工具结果块
class ToolResultBlock(BaseModel):
    """Tool result content sent back to the model."""
    type: Literal["tool_result"] = "tool_result"
    tool_use_id: str  # 对应 ToolUseBlock 的 id
    content: str      # 工具输出(文本)
    is_error: bool = False  # 是否出错

工具执行完成后,生成 ToolResultBlock 返回给模型:

  • tool_use_id: 对应工具调用的 ID
  • content: 工具输出内容
  • is_error: 标记是否执行出错
3.1.5 ContentBlock 联合类型
ContentBlock = Annotated[
    TextBlock | ImageBlock | ToolUseBlock | ToolResultBlock,
    Field(discriminator="type"),
]

使用 AnnotatedField(discriminator="type") 定义联合类型,Pydantic 会根据 type 字段自动反序列化为正确的类型。

3.2 ConversationMessage 消息类

class ConversationMessage(BaseModel):
    """A single assistant or user message."""
    role: Literal["user", "assistant"]
    content: list[ContentBlock] = Field(default_factory=list)

ConversationMessage 是对话的基本单元:

  • role: 消息角色,只能是 "user""assistant"
  • content: 内容块列表,可以包含文本、图像、工具调用、工具结果
3.2.1 便利方法
@classmethod
def from_user_text(cls, text: str) -> "ConversationMessage":
    """Construct a user message from raw text."""
    return cls(role="user", content=[TextBlock(text=text)])

@classmethod
def from_user_content(cls, content: list[ContentBlock]) -> "ConversationMessage":
    """Construct a user message from explicit content blocks."""
    return cls(role="user", content=list(content))

工厂方法简化常见场景的创建。

3.2.2 属性访问器
@property
def text(self) -> str:
    """Return concatenated text blocks."""
    return "".join(
        block.text for block in self.content if isinstance(block, TextBlock)
    )

@property
def tool_uses(self) -> list[ToolUseBlock]:
    """Return all tool calls contained in the message."""
    return [block for block in self.content if isinstance(block, ToolUseBlock)]

text 属性方便获取消息中的所有文本内容(忽略图像、工具调用等)。

tool_uses 属性提取所有工具调用块,用于判断助手是否请求了工具执行。

3.2.3 API 参数转换
def to_api_param(self) -> dict[str, Any]:
    """Convert the message into Anthropic SDK message params."""
    return {
        "role": self.role,
        "content": [serialize_content_block(block) for block in self.content],
    }

转换为 Anthropic SDK 所需的格式。

3.2.4 空消息检测
def is_effectively_empty(self) -> bool:
    """Return True when the message carries no useful content."""
    if self.content:
        for block in self.content:
            if isinstance(block, TextBlock) and block.text.strip():
                return False
            if isinstance(block, (ImageBlock, ToolUseBlock, ToolResultBlock)):
                return False
    return True

检测消息是否 “实质为空”:

  • 有非空文本块 → 不为空
  • 有图像、工具调用、工具结果 → 不为空
  • 只有空白文本或没有内容 → 为空

这个方法用于过滤掉模型偶尔返回的空消息。

3.3 消息序列化与清理

3.3.1 序列化函数
def serialize_content_block(block: ContentBlock) -> dict[str, Any]:
    """Convert a local content block into the provider wire format."""
    if isinstance(block, TextBlock):
        return {"type": "text", "text": block.text}

    if isinstance(block, ImageBlock):
        return {
            "type": "image",
            "source": {
                "type": "base64",
                "media_type": block.media_type,
                "data": block.data,
            },
        }

    if isinstance(block, ToolUseBlock):
        return {
            "type": "tool_use",
            "id": block.id,
            "name": block.name,
            "input": block.input,
        }

    return {
        "type": "tool_result",
        "tool_use_id": block.tool_use_id,
        "content": block.content,
        "is_error": block.is_error,
    }

将内部消息块转换为 API 所需的 JSON 格式。

3.3.2 消息清理函数
def sanitize_conversation_messages(
    messages: list[ConversationMessage]
) -> list[ConversationMessage]:
    """Normalize restored conversation history into a provider-safe sequence.

    This drops legacy empty assistant messages and trims malformed trailing tool
    turns, such as an assistant ``tool_use`` message that never received a
    matching user ``tool_result`` response.
    """

这个函数非常重要,用于清理从持久化存储恢复的消息历史:

  1. 删除空助手消息:模型有时会返回空消息,需要过滤掉
  2. 修复未完成的工具调用:如果助手调用了工具但会话中断,工具结果没有返回,需要删除未匹配的工具调用

这种清理确保恢复的消息序列可以被 API 正常处理。


第四章:工具系统详解

4.1 工具基类设计

src/openharness/tools/base.py 定义了工具系统的抽象基类。

4.1.1 工具执行上下文
@dataclass
class ToolExecutionContext:
    """Shared execution context for tool invocations."""
    cwd: Path                                    # 当前工作目录
    metadata: dict[str, Any] = field(default_factory=dict)  # 元数据
    hook_executor: HookExecutor | None = None   # 钩子执行器

ToolExecutionContext 封装了工具执行时的环境信息:

  • cwd: 所有文件操作的基础路径
  • metadata: 工具间共享状态(如已读取的文件、激活的技能)
  • hook_executor: 用于触发工具执行前后的钩子
4.1.2 工具结果
@dataclass(frozen=True)
class ToolResult:
    """Normalized tool execution result."""
    output: str                                 # 输出内容
    is_error: bool = False                      # 是否出错
    metadata: dict[str, Any] = field(default_factory=dict)  # 元数据

ToolResult 是标准化的工具输出:

  • output: 文本输出(即使出错也返回错误信息)
  • is_error: 错误标记,模型会根据这个调整后续行为
  • metadata: 额外信息(如返回码、文件路径等)

使用 frozen=True 使结果不可变,避免副作用。

4.1.3 工具基类
class BaseTool(ABC):
    """Base class for all OpenHarness tools."""
    name: str                                   # 工具名称
    description: str                            # 工具描述(给模型看的)
    input_model: type[BaseModel]               # Pydantic 输入模型

    @abstractmethod
    async def execute(
        self, 
        arguments: BaseModel, 
        context: ToolExecutionContext
    ) -> ToolResult:
        """Execute the tool."""

    def is_read_only(self, arguments: BaseModel) -> bool:
        """Return whether the invocation is read-only."""
        del arguments
        return False

    def to_api_schema(self) -> dict[str, Any]:
        """Return the tool schema expected by the Anthropic Messages API."""
        return {
            "name": self.name,
            "description": self.description,
            "input_schema": self.input_model.model_json_schema(),
        }

BaseTool 是所有工具的抽象基类:

类属性:

  • name: 工具的唯一标识,如 "bash""read_file"
  • description: 自然语言描述,告诉模型这个工具是做什么的、什么时候该用
  • input_model: Pydantic 模型类,定义工具参数的结构和校验规则

抽象方法:

  • execute(): 实际的工具执行逻辑,子类必须实现

可选方法:

  • is_read_only(): 返回此调用是否为只读操作(用于权限控制)
  • to_api_schema(): 转换为 API 所需的工具定义格式

4.2 工具注册表

class ToolRegistry:
    """Map tool names to implementations."""

    def __init__(self) -> None:
        self._tools: dict[str, BaseTool] = {}

    def register(self, tool: BaseTool) -> None:
        """Register a tool instance."""
        self._tools[tool.name] = tool

    def get(self, name: str) -> BaseTool | None:
        """Return a registered tool by name."""
        return self._tools.get(name)

    def list_tools(self) -> list[BaseTool]:
        """Return all registered tools."""
        return list(self._tools.values())

    def to_api_schema(self) -> list[dict[str, Any]]:
        """Return all tool schemas in API format."""
        return [tool.to_api_schema() for tool in self._tools.values()]

ToolRegistry 是简单的字典封装:

  • register(): 注册工具实例
  • get(): 按名称查找工具
  • list_tools(): 获取所有工具
  • to_api_schema(): 批量转换为 API 格式

4.3 BashTool 实现详解

src/openharness/tools/bash_tool.py 实现了命令行执行工具,是 OpenHarness 最常用的工具之一。

4.3.1 输入模型
class BashToolInput(BaseModel):
    """Arguments for the bash tool."""
    command: str = Field(description="Shell command to execute")
    cwd: str | None = Field(default=None, description="Working directory override")
    timeout_seconds: int = Field(default=600, ge=1, le=600)

输入参数:

  • command: 要执行的命令(必需)
  • cwd: 可选的工作目录覆盖
  • timeout_seconds: 超时时间,默认 600 秒,范围 1-600

使用 Pydantic 的 Field 定义描述和校验规则。

4.3.2 工具类定义
class BashTool(BaseTool):
    """Execute a shell command with stdout/stderr capture."""
    name = "bash"
    description = "Run a shell command in the local repository."
    input_model = BashToolInput

基本信息:

  • 名称:"bash"
  • 描述:告诉模型这是一个在本地仓库执行 shell 命令的工具
  • 输入模型:BashToolInput
4.3.3 执行逻辑详解
async def execute(
    self, 
    arguments: BashToolInput, 
    context: ToolExecutionContext
) -> ToolResult:
    # 1. 确定工作目录
    cwd = Path(arguments.cwd).expanduser() if arguments.cwd else context.cwd
    
    # 2. 预检查交互式命令
    preflight_error = _preflight_interactive_command(arguments.command)
    if preflight_error is not None:
        return ToolResult(
            output=preflight_error,
            is_error=True,
            metadata={"interactive_required": True},
        )

步骤 1-2:前置检查

首先解析工作目录,然后检查命令是否可能是交互式的(如 npm create 会提示用户输入)。如果是,提前返回错误,避免挂起。

    # 3. 创建子进程
    process: asyncio.subprocess.Process | None = None
    try:
        process = await create_shell_subprocess(
            arguments.command,
            cwd=cwd,
            prefer_pty=True,  # 优先使用 PTY,支持颜色输出
            stdin=asyncio.subprocess.DEVNULL,  # 非交互式
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.STDOUT,  # 合并 stderr
        )
    except SandboxUnavailableError as exc:
        return ToolResult(output=str(exc), is_error=True)
    except asyncio.CancelledError:
        if process is not None:
            await _terminate_process(process, force=False)
        raise

步骤 3:创建子进程

使用 create_shell_subprocess 创建异步子进程:

  • prefer_pty=True: 请求伪终端,这样命令的输出会保留颜色
  • stdin=DEVNULL: 非交互式,不接收输入
  • stderr=STDOUT: 合并标准错误到标准输出

处理异常情况:

  • SandboxUnavailableError: 沙箱不可用(如果配置了沙箱)
  • CancelledError: 任务被取消,优雅终止进程
    # 4. 等待进程完成或超时
    try:
        await asyncio.wait_for(process.wait(), timeout=arguments.timeout_seconds)
    except asyncio.TimeoutError:
        # 超时处理:读取部分输出,强制终止
        output_buffer = await _drain_available_output(process.stdout)
        await _terminate_process(process, force=True)
        output_buffer.extend(await _read_remaining_output(process))
        return ToolResult(
            output=_format_timeout_output(
                output_buffer,
                command=arguments.command,
                timeout_seconds=arguments.timeout_seconds,
            ),
            is_error=True,
            metadata={"returncode": process.returncode, "timed_out": True},
        )
    except asyncio.CancelledError:
        await _terminate_process(process, force=False)
        raise

步骤 4:超时处理

等待进程完成:

  • 正常完成:继续执行
  • 超时:读取已输出内容,强制终止,返回超时错误
  • 取消:优雅终止,重新抛出异常
    # 5. 读取输出并格式化结果
    output_buffer = await _read_remaining_output(process)
    text = _format_output(output_buffer)
    return ToolResult(
        output=text,
        is_error=process.returncode != 0,
        metadata={"returncode": process.returncode},
    )

步骤 5:结果格式化

读取剩余输出,格式化,返回结果:

  • is_error: 根据返回码判断
  • metadata: 包含返回码信息
4.3.4 进程终止函数
async def _terminate_process(
    process: asyncio.subprocess.Process, 
    *, 
    force: bool
) -> None:
    if process.returncode is not None:
        return  # 已经退出
    
    if force:
        process.kill()  # SIGKILL
        await process.wait()
        return
    
    process.terminate()  # SIGTERM
    try:
        await asyncio.wait_for(process.wait(), timeout=2.0)
    except asyncio.TimeoutError:
        process.kill()  # 优雅终止超时,强制杀死
        await process.wait()

终止逻辑:

  1. 检查进程是否已退出
  2. 如果 force=True,直接发送 SIGKILL
  3. 否则先发送 SIGTERM,等待 2 秒
  4. 如果优雅终止超时,再发送 SIGKILL
4.3.5 输出读取函数
async def _read_remaining_output(
    process: asyncio.subprocess.Process
) -> bytearray:
    """读取进程的所有剩余输出。"""
    output_buffer = bytearray()
    if process.stdout is not None:
        try:
            remaining = await asyncio.wait_for(
                process.stdout.read(),
                timeout=_READ_REMAINING_OUTPUT_TIMEOUT_SECONDS,  # 2 秒
            )
        except asyncio.TimeoutError:
            remaining = b""
        output_buffer.extend(remaining)
    return output_buffer

async def _drain_available_output(
    stream: asyncio.StreamReader | None,
    *,
    read_timeout: float = 0.05,
) -> bytearray:
    """非阻塞式读取所有可用输出。"""
    output_buffer = bytearray()
    if stream is None:
        return output_buffer
    while True:
        try:
            chunk = await asyncio.wait_for(stream.read(65536), timeout=read_timeout)
        except asyncio.TimeoutError:
            return output_buffer
        if not chunk:
            return output_buffer
        output_buffer.extend(chunk)

两个读取函数:

  • _read_remaining_output(): 读取所有剩余数据,带超时
  • _drain_available_output(): 非阻塞读取,快速获取已缓冲的数据
4.3.6 输出格式化
def _format_output(output_buffer: bytearray) -> str:
    """格式化输出缓冲区。"""
    # 解码为 UTF-8,替换无法解码的字节
    text = output_buffer.decode("utf-8", errors="replace").replace("\r\n", "\n").strip()
    
    if not text:
        return "(no output)"  # 无输出提示
    
    if len(text) > 12000:
        return f"{text[:12000]}\n...[truncated]..."  # 截断长输出
    
    return text

格式化逻辑:

  • 解码 UTF-8,处理编码错误
  • 统一换行符为 \n
  • 空输出显示 (no output)
  • 超过 12000 字符截断
4.3.7 交互式命令检测
def _preflight_interactive_command(command: str) -> str | None:
    """预检查可能的交互式命令。"""
    lowered_command = command.lower()
    if not _looks_like_interactive_scaffold(lowered_command):
        return None
    
    return (
        "This command appears to require interactive input... "
        "Prefer non-interactive flags (for example --yes, -y, ...)"
    )

def _looks_like_interactive_scaffold(lowered_command: str) -> bool:
    """检测是否像是需要交互的脚手架命令。"""
    scaffold_markers = (
        "create-next-app",
        "npm create ",
        "pnpm create ",
        "yarn create ",
        "bun create ",
        "pnpm dlx ",
        "npm init ",
        ...
    )
    non_interactive_markers = (
        "--yes", " -y", "--skip-install",
        "--defaults", "--non-interactive", "--ci",
    )
    
    # 包含脚手架标记且不包含非交互标记
    return any(marker in lowered_command for marker in scaffold_markers) \
           and not any(marker in lowered_command for marker in non_interactive_markers)

交互式命令检测:

  • 检测 npm createcreate-next-app 等常见交互式命令
  • 检查是否已包含 --yes-y 等非交互标志
  • 如果可能需要交互,提前返回错误提示

这个设计非常贴心,避免了很多新手容易犯的错误。


第五章:查询执行流程深度剖析

5.1 QueryContext 数据类

src/openharness/engine/query.py 定义了查询执行的核心逻辑。首先是 QueryContext 数据类:

@dataclass
class QueryContext:
    """Context shared across a query run."""
    api_client: SupportsStreamingMessages
    tool_registry: ToolRegistry
    permission_checker: PermissionChecker
    cwd: Path
    model: str
    system_prompt: str
    max_tokens: int
    context_window_tokens: int | None = None
    auto_compact_threshold_tokens: int | None = None
    permission_prompt: PermissionPrompt | None = None
    ask_user_prompt: AskUserPrompt | None = None
    max_turns: int | None = 200
    hook_executor: HookExecutor | None = None
    tool_metadata: dict[str, object] | None = None

QueryContext 是查询执行的上下文容器,包含了执行一次查询所需的所有依赖。使用 dataclass 简化定义。

5.2 工具元数据管理

OpenHarness 使用工具元数据(tool_metadata)在工具调用之间传递状态。这部分代码展示了如何管理这种 “记忆”:

5.2.1 任务焦点状态
def _task_focus_state(tool_metadata: dict[str, object] | None) -> dict[str, object]:
    """获取或初始化任务焦点状态。"""
    if tool_metadata is None:
        return {}
    
    value = tool_metadata.setdefault(
        "task_focus_state",
        {
            "goal": "",              # 当前目标
            "recent_goals": [],      # 最近的目标历史
            "active_artifacts": [],  # 活跃的工件(文件、URL等)
            "verified_state": [],    # 已验证的状态
            "next_step": "",         # 下一步计划
        },
    )
    ...

任务焦点状态跟踪 Agent 的 “思维状态”:

  • goal: 当前正在处理的目标
  • recent_goals: 最近处理过的目标(防止重复)
  • active_artifacts: 正在处理的文件、URL 等
  • verified_state: 已确认完成的工作
  • next_step: 计划中的下一步
5.2.2 记忆用户目标
def remember_user_goal(
    tool_metadata: dict[str, object] | None,
    prompt: str,
) -> None:
    """记录用户目标到元数据。"""
    state = _task_focus_state(tool_metadata)
    summary = _summarize_focus_text(prompt)
    if not summary:
        return
    
    recent_goals = state.setdefault("recent_goals", [])
    if isinstance(recent_goals, list):
        _append_capped_unique(recent_goals, summary, limit=MAX_TRACKED_USER_GOALS)
    state["goal"] = summary

当用户提交新消息时,提取并记录目标。使用 _append_capped_unique() 维护一个有限长度的历史列表。

5.2.3 记忆已读文件
def _remember_read_file(
    tool_metadata: dict[str, object] | None,
    *,
    path: str,
    offset: int,
    limit: int,
    output: str,
) -> None:
    """记录文件读取操作。"""
    bucket = _tool_metadata_bucket(tool_metadata, "read_file_state")
    
    # 提取前 6 行非空内容作为预览
    preview_lines = [line.strip() for line in output.splitlines()[:6] if line.strip()]
    
    entry = {
        "path": path,
        "span": f"lines {offset + 1}-{offset + limit}",
        "preview": " | ".join(preview_lines)[:320],
        "timestamp": time.time(),
    }
    
    # 去重:删除同一文件的旧记录
    bucket[:] = [
        existing
        for existing in bucket
        if not isinstance(existing, dict) or str(existing.get("path") or "") != path
    ]
    bucket.append(entry)
    
    # 限制历史长度
    if len(bucket) > MAX_TRACKED_READ_FILES:
        del bucket[:-MAX_TRACKED_READ_FILES]

记录文件读取:

  • 文件路径
  • 读取的行范围
  • 内容预览(前 6 行)
  • 时间戳

保持最近 6 个文件的读取记录,用于后续决策。

5.2.4 记录工具执行
def _record_tool_carryover(
    context: QueryContext,
    *,
    tool_name: str,
    tool_input: dict[str, object],
    tool_output: str,
    tool_result_metadata: dict[str, object] | None,
    is_error: bool,
    resolved_file_path: str | None,
) -> None:
    """记录工具执行到元数据,供后续工具参考。"""
    if is_error:
        return  # 不记录错误
    
    # 记录文件操作为活跃工件
    if resolved_file_path is not None:
        _remember_active_artifact(context.tool_metadata, resolved_file_path)
    
    # 根据工具类型记录特定信息
    if tool_name == "read_file":
        _remember_read_file(...)
        _remember_verified_work(...)
    elif tool_name == "skill":
        _remember_skill_invocation(...)
    elif tool_name in {"agent", "send_message"}:
        _remember_async_agent_activity(...)
    elif tool_name == "bash":
        _remember_verified_work(...)
    ...

根据工具类型记录不同的信息:

  • read_file: 记录文件读取历史
  • skill: 记录技能调用
  • agent: 记录异步 Agent 活动
  • bash: 记录命令执行

这种设计让 Agent 能够 “记住” 它做过什么,避免重复工作。

5.3 run_query 核心循环

run_query() 是 OpenHarness 最核心的函数,实现了完整的 Agent 循环。

5.3.1 函数签名与初始化
async def run_query(
    context: QueryContext,
    messages: list[ConversationMessage],
) -> AsyncIterator[tuple[StreamEvent, UsageSnapshot | None]]:
    """Run the conversation loop until the model stops requesting tools.

    Auto-compaction is checked at the start of each turn. When the
    estimated token count exceeds the model's auto-compact threshold,
    the engine first tries a cheap microcompact (clearing old tool result
    content) and, if that is not enough, performs a full LLM-based
    summarization of older messages.
    """
    from openharness.services.compact import (
        AutoCompactState,
        auto_compact_if_needed,
    )

    compact_state = AutoCompactState()
    reactive_compact_attempted = False
    last_compaction_result: tuple[list[ConversationMessage], bool] = (messages, False)
    
    # 计算安全的 max_tokens
    effective_max_tokens = _bounded_completion_tokens(
        context.max_tokens,
        context.context_window_tokens,
    )
    reported_token_clamp = False

初始化:

  • compact_state: 自动压缩状态
  • effective_max_tokens: 考虑上下文窗口后的安全 token 限制
5.3.2 自动压缩内部函数
    async def _stream_compaction(
        *,
        trigger: str,
        force: bool = False,
    ) -> AsyncIterator[tuple[StreamEvent, UsageSnapshot | None]]:
        """执行自动压缩,流式返回进度事件。"""
        nonlocal last_compaction_result
        progress_queue: asyncio.Queue[CompactProgressEvent] = asyncio.Queue()

        async def _progress(event: CompactProgressEvent) -> None:
            await progress_queue.put(event)

        # 创建压缩任务
        task = asyncio.create_task(
            auto_compact_if_needed(
                messages,
                api_client=context.api_client,
                model=context.model,
                system_prompt=context.system_prompt,
                state=compact_state,
                progress_callback=_progress,
                force=force,
                trigger=trigger,
                ...
            )
        )
        
        # 流式返回进度事件
        while True:
            try:
                event = await asyncio.wait_for(progress_queue.get(), timeout=0.05)
                yield event, None
            except asyncio.TimeoutError:
                if task.done():
                    break
                continue
        
        # 获取最终结果
        last_compaction_result = await task

自动压缩机制:

  • 当对话 token 数超过阈值时触发
  • 先尝试 “微压缩”(清理旧工具结果)
  • 如果不够,使用 LLM 进行完整摘要
  • 流式返回进度,保持 UI 响应
5.3.3 主循环
    turn_count = 0
    while context.max_turns is None or turn_count < context.max_turns:
        turn_count += 1
        
        # 1. 检查并执行自动压缩
        async for event, usage in _stream_compaction(trigger="auto"):
            yield event, usage
        messages, was_compacted = last_compaction_result
        
        # 2. 图像预处理(非多模态模型)
        async for event in _preprocess_images_in_messages(messages, context):
            yield event, None
        
        final_message: ConversationMessage | None = None
        usage = UsageSnapshot()

每轮开始:

  1. 自动压缩(如果需要)
  2. 图像转文本(非多模态模型)
5.3.4 调用模型 API
        try:
            async for event in context.api_client.stream_message(
                ApiMessageRequest(
                    model=context.model,
                    messages=messages,
                    system_prompt=context.system_prompt,
                    max_tokens=effective_max_tokens,
                    tools=context.tool_registry.to_api_schema(),
                )
            ):
                if isinstance(event, ApiTextDeltaEvent):
                    # 文本增量事件
                    yield AssistantTextDelta(text=event.text), None
                    continue
                    
                if isinstance(event, ApiRetryEvent):
                    # 重试事件
                    yield StatusEvent(
                        message=f"Request failed; retrying in {event.delay_seconds:.1f}s..."
                    ), None
                    continue

                if isinstance(event, ApiMessageCompleteEvent):
                    # 消息完成事件
                    final_message = event.message
                    usage = event.usage
                    
        except Exception as exc:
            # 错误处理:token 限制、提示过长、网络错误等
            ...

流式调用模型 API:

  • 发送当前对话历史
  • 流式返回文本增量
  • 处理重试事件
  • 捕获完成事件
5.3.5 处理模型响应
        if final_message is None:
            raise RuntimeError("Model stream finished without a final message")

        # 处理协调器上下文(多 Agent 场景)
        coordinator_context_message: ConversationMessage | None = None
        if context.system_prompt.startswith("You are a **coordinator**."):
            ...

        # 过滤空消息
        if final_message.role == "assistant" and final_message.is_effectively_empty():
            yield ErrorEvent(message="Model returned an empty assistant message."), usage
            return

        # 追加到对话历史
        messages.append(final_message)
        yield AssistantTurnComplete(message=final_message, usage=usage), usage

处理响应:

  • 检查消息是否为空
  • 追加到历史
  • 返回完成事件
5.3.6 检查工具调用
        if not final_message.tool_uses:
            # 没有工具调用,对话结束
            if context.hook_executor is not None:
                await context.hook_executor.execute(
                    HookEvent.STOP,
                    {"event": HookEvent.STOP.value, "stop_reason": "tool_uses_empty"},
                )
            return

        tool_calls = final_message.tool_uses

检查模型是否请求了工具调用。如果没有,结束对话。

5.3.7 执行工具
        if len(tool_calls) == 1:
            # 单个工具:顺序执行
            tc = tool_calls[0]
            yield ToolExecutionStarted(tool_name=tc.name, tool_input=tc.input), None
            result = await _execute_tool_call(context, tc.name, tc.id, tc.input)
            yield ToolExecutionCompleted(
                tool_name=tc.name,
                output=result.content,
                is_error=result.is_error,
            ), None
            tool_results = [result]
        else:
            # 多个工具:并发执行
            for tc in tool_calls:
                yield ToolExecutionStarted(tool_name=tc.name, tool_input=tc.input), None

            async def _run(tc):
                return await _execute_tool_call(context, tc.name, tc.id, tc.input)

            # 并发执行,一个失败不影响其他
            raw_results = await asyncio.gather(
                *[_run(tc) for tc in tool_calls], return_exceptions=True
            )
            ...

工具执行策略:

  • 单个工具:顺序执行,立即返回结果
  • 多个工具:并发执行,提高性能
  • 使用 return_exceptions=True 确保一个工具失败不会导致其他工具被取消
5.3.8 追加工具结果并继续循环
        # 追加工具结果到对话历史
        messages.append(ConversationMessage(role="user", content=tool_results))

    # 超过最大轮数
    if context.max_turns is not None:
        raise MaxTurnsExceeded(context.max_turns)

将工具结果作为用户消息追加到历史,然后循环继续,让模型对结果做出响应。

5.4 工具执行函数

async def _execute_tool_call(
    context: QueryContext,
    tool_name: str,
    tool_use_id: str,
    tool_input: dict[str, object],
) -> ToolResultBlock:
    """执行单个工具调用。"""
    
    # 1. 执行 pre_tool_use 钩子
    if context.hook_executor is not None:
        pre_hooks = await context.hook_executor.execute(...)
        if pre_hooks.blocked:
            return ToolResultBlock(..., is_error=True)

    # 2. 查找工具
    tool = context.tool_registry.get(tool_name)
    if tool is None:
        return ToolResultBlock(..., content=f"Unknown tool: {tool_name}", is_error=True)

    # 3. 校验输入
    try:
        parsed_input = tool.input_model.model_validate(tool_input)
    except Exception as exc:
        return ToolResultBlock(..., content=f"Invalid input: {exc}", is_error=True)

    # 4. 权限检查
    decision = context.permission_checker.evaluate(...)
    if not decision.allowed:
        if decision.requires_confirmation and context.permission_prompt is not None:
            confirmed = await context.permission_prompt(tool_name, decision.reason)
            if not confirmed:
                return ToolResultBlock(..., is_error=True)
        else:
            return ToolResultBlock(..., is_error=True)

    # 5. 执行工具
    result = await tool.execute(parsed_input, ToolExecutionContext(...))
    
    # 6. 记录到元数据
    _record_tool_carryover(context, ...)
    
    # 7. 执行 post_tool_use 钩子
    if context.hook_executor is not None:
        await context.hook_executor.execute(HookEvent.POST_TOOL_USE, ...)
    
    return ToolResultBlock(...)

工具执行流程:

  1. 执行前置钩子
  2. 查找工具实现
  3. Pydantic 校验输入
  4. 权限检查(可能需要用户确认)
  5. 实际执行工具
  6. 记录执行历史到元数据
  7. 执行后置钩子
  8. 返回结果

第六章:ohmo 个人 Agent 系统

6.1 ohmo 的定位

ohmo 是 OpenHarness 提供的个人 Agent 应用,构建在核心框架之上。与 oh 命令不同,ohmo 专注于个人助理场景:

  • 长期记忆:记住用户偏好、工作习惯
  • 人格定制:通过 soul.md 定义 Agent 性格
  • IM 集成:通过 Gateway 接入 Telegram、Slack、Discord、飞书
  • 会话持久化:跨对话保持上下文

6.2 工作空间系统

ohmo/workspace.py 实现了完整的工作空间管理。

6.2.1 工作空间目录结构
WORKSPACE_DIRNAME = ".ohmo"

# 初始化时创建的文件:
# ~/.ohmo/
#   ├── soul.md           # Agent 人格定义
#   ├── identity.md       # Agent 身份
#   ├── user.md          # 用户画像
#   ├── BOOTSTRAP.md     # 首次引导
#   ├── state.json       # 状态文件
#   ├── gateway.json     # 网关配置
#   ├── memory/          # 记忆目录
#   │   ├── MEMORY.md    # 记忆索引
#   │   └── *.md         # 具体记忆文件
#   ├── skills/          # 技能目录
#   ├── plugins/         # 插件目录
#   ├── sessions/        # 会话存档
#   └── logs/            # 日志目录
6.2.2 核心模板

SOUL_TEMPLATE:定义 Agent 的核心价值观

SOUL_TEMPLATE = """# SOUL.md - Who You Are

You are ohmo, a personal agent built on top of OpenHarness.

## Core truths

- Be genuinely helpful, not performatively helpful.
  Skip filler like "great question" or "happy to help" unless it is actually
  natural in context.
- Have judgment.
  You can prefer one option over another, notice tradeoffs, and explain your
  reasons plainly.
- Be resourceful before asking.
  Read the file, check the context, inspect the state, and try to figure things
  out before bouncing work back to the user.
- Earn trust through competence.
  Be careful with anything public, destructive, costly, or user-facing.
  Be bolder with internal investigation, drafting, organizing, and synthesis.
- Remember that access is intimacy.
  Messages, files, notes, and history are personal. Treat them with respect.

## Boundaries

- Private things stay private.
- When in doubt, ask before acting externally.
- Do not send half-baked replies on messaging channels.
- In groups, do not casually speak as if you are the user.
- Do not optimize for flattery; optimize for usefulness, honesty, and good taste.

## Vibe

Be concise when the answer is simple. Be thorough when the stakes are high.
Sound like a capable companion with taste, not a corporate support bot.
"""

这个模板定义了 ohmo 的行为准则:

  • 真诚帮助:不做作,不说空话
  • 有判断力:能做出选择,解释原因
  • 自主解决:先尝试自己解决,不轻易打扰用户
  • 建立信任:谨慎处理公开、破坏性操作
  • 尊重隐私:把用户数据当作私密信息

USER_TEMPLATE:用户画像

USER_TEMPLATE = """# user.md - About Your Human

Learn the person you are helping. Keep this useful, respectful, and current.

## Profile

- Name:
- What to call them:
- Pronouns: *(optional)*
- Timezone:
- Languages:

## Defaults

- Preferred tone:
- Preferred answer length:
- Decision style:
- Typical working hours:

## Ongoing context

- Main projects:
- Recurring responsibilities:
- Current pressures or priorities:
- Tools and platforms they use often:

## Preferences

- What they usually want more of:
- What tends to annoy them:
- What they want handled carefully:
- What kinds of reminders or follow-through help them:

## Relationship notes

How should ohmo show up for this user over time?
What kind of assistant relationship feels right: terse operator, thoughtful
partner, organized chief of staff, calm technical companion, or something else?
"""

用户画像模板收集:

  • 基本信息:名字、称呼、时区、语言
  • 偏好设置:语调、回答长度、决策风格
  • 工作背景:主要项目、职责、使用的工具
  • 互动偏好:喜欢什么、讨厌什么、需要什么样的提醒

这种设计让 Agent 能够真正了解用户,提供个性化服务。

6.2.3 工作空间初始化
def initialize_workspace(workspace: str | Path | None = None) -> Path:
    """Create the workspace and seed template files when missing."""
    root = ensure_workspace(workspace)
    
    # 模板文件映射
    templates = {
        get_soul_path(root): SOUL_TEMPLATE,
        get_user_path(root): USER_TEMPLATE,
        get_memory_index_path(root): MEMORY_INDEX_TEMPLATE,
        get_identity_path(root): IDENTITY_TEMPLATE,
    }
    
    # 创建缺失的模板文件
    for path, content in templates.items():
        if not path.exists():
            path.write_text(content.strip() + "\n", encoding="utf-8")
    
    # 初始化 state.json
    state_path = get_state_path(root)
    state_data = {"app": "ohmo", "workspace": str(root.resolve())}
    if not state_path.exists():
        state_path.write_text(json.dumps(state_data, indent=2) + "\n", encoding="utf-8")
    
    # 首次引导标记
    bootstrap_path = get_bootstrap_path(root)
    if not state_data.get("bootstrap_seeded"):
        state_data["bootstrap_seeded"] = True
        if not bootstrap_path.exists():
            bootstrap_path.write_text(BOOTSTRAP_TEMPLATE.strip() + "\n", encoding="utf-8")
        state_path.write_text(json.dumps(state_data, indent=2) + "\n", encoding="utf-8")
    
    # 初始化网关配置
    gateway_path = get_gateway_config_path(root)
    if not gateway_path.exists():
        gateway_path.write_text(
            json.dumps(
                {
                    "provider_profile": "codex",
                    "enabled_channels": [],
                    "session_routing": "chat-thread",
                    "send_progress": True,
                    "send_tool_hints": True,
                    "permission_mode": "default",
                    "sandbox_enabled": False,
                    "allow_remote_admin_commands": False,
                    ...
                },
                indent=2,
            ) + "\n",
            encoding="utf-8",
        )
    return root

初始化流程:

  1. 创建工作空间目录结构
  2. 生成缺失的模板文件(soul.md、user.md 等)
  3. 初始化 state.json
  4. 首次引导标记和 BOOTSTRAP.md
  5. 生成默认网关配置

6.3 记忆系统

ohmo/memory.py 实现了个人记忆管理。

6.3.1 记忆文件管理
def list_memory_files(workspace: str | Path | None = None) -> list[Path]:
    """List `.ohmo` memory markdown files."""
    memory_dir = get_memory_dir(workspace)
    return sorted(path for path in memory_dir.glob("*.md") if path.name != "MEMORY.md")

列出记忆目录下的所有 Markdown 文件(排除索引文件)。

def add_memory_entry(workspace: str | Path | None, title: str, content: str) -> Path:
    """Create a personal memory file and append it to MEMORY.md."""
    memory_dir = get_memory_dir(workspace)
    memory_dir.mkdir(parents=True, exist_ok=True)
    
    # 生成 slug 文件名
    slug = sub(r"[^a-zA-Z0-9]+", "_", title.strip().lower()).strip("_") or "memory"
    path = memory_dir / f"{slug}.md"
    path.write_text(content.strip() + "\n", encoding="utf-8")

    # 更新索引
    index_path = get_memory_index_path(workspace)
    existing = index_path.read_text(encoding="utf-8") if index_path.exists() else "# Memory Index\n"
    if path.name not in existing:
        existing = existing.rstrip() + f"\n- [{title}]({path.name})\n"
        index_path.write_text(existing, encoding="utf-8")
    return path

添加记忆的流程:

  1. 将标题转换为 slug(如 “My Preferences” → “my_preferences”)
  2. 创建 Markdown 文件
  3. 更新 MEMORY.md 索引
def remove_memory_entry(workspace: str | Path | None, name: str) -> bool:
    """Delete a memory file and remove its index entry."""
    memory_dir = get_memory_dir(workspace)
    matches = [path for path in memory_dir.glob("*.md") if path.stem == name or path.name == name]
    if not matches:
        return False
    path = matches[0]
    path.unlink(missing_ok=True)

    # 从索引中移除
    index_path = get_memory_index_path(workspace)
    if index_path.exists():
        lines = [line for line in index_path.read_text(encoding="utf-8").splitlines() if path.name not in line]
        index_path.write_text("\n".join(lines).rstrip() + "\n", encoding="utf-8")
    return True

删除记忆时同时删除文件和索引条目。

6.3.2 加载记忆到提示词
def load_memory_prompt(workspace: str | Path | None = None, *, max_files: int = 5) -> str | None:
    """Return a prompt section describing personal memory."""
    memory_dir = get_memory_dir(workspace)
    index_path = get_memory_index_path(workspace)
    
    lines = [
        "# ohmo Memory",
        f"- Personal memory directory: {memory_dir}",
        "- Use this memory for stable user preferences and durable personal context.",
    ]

    # 添加索引内容
    if index_path.exists():
        index_lines = index_path.read_text(encoding="utf-8").splitlines()[:200]
        lines.extend(["", "## MEMORY.md", "```md", *index_lines, "```"])

    # 添加具体记忆文件(最多 5 个)
    for path in list_memory_files(workspace)[:max_files]:
        content = path.read_text(encoding="utf-8", errors="replace").strip()
        if not content:
            continue
        lines.extend(["", f"## {path.name}", "```md", content[:4000], "```"])

    return "\n".join(lines)

将记忆加载到提示词中:

  1. 包含索引概览
  2. 包含最近 5 个记忆文件的完整内容(截断到 4000 字符)
  3. 格式化 Markdown,便于模型理解

这种设计让 Agent 在每次对话时都能 “记起” 用户的重要信息。

6.4 Gateway IM 网关

ohmo/gateway/ 目录实现了 IM(即时通讯)网关,让 Agent 能接入各种聊天平台。

6.4.1 网关服务
class OhmoGatewayService:
    """Foreground/background service wrapper for the personal gateway."""

    def __init__(self, cwd: str | Path | None = None, workspace: str | Path | None = None):
        self._cwd = str(Path(cwd or Path.cwd()).resolve())
        self._workspace = workspace
        os.chdir(self._cwd)
        
        # 初始化工作空间
        root = initialize_workspace(self._workspace)
        os.environ["OHMO_WORKSPACE"] = str(root)
        
        # 加载配置
        self._config = load_gateway_config(self._workspace)
        
        # 创建消息总线
        self._bus = MessageBus()
        
        # 创建运行时池(管理多个并发会话)
        self._runtime_pool = OhmoSessionRuntimePool(
            cwd=self._cwd,
            workspace=self._workspace,
            provider_profile=self._config.provider_profile,
        )
        
        # 创建桥接器
        self._bridge = OhmoGatewayBridge(
            bus=self._bus,
            runtime_pool=self._runtime_pool,
            restart_gateway=self.request_restart,
        )
        
        # 创建渠道管理器
        self._manager = ChannelManager(
            build_channel_manager_config(self._config), self._bus
        )

网关服务组件:

  • MessageBus: 消息总线,解耦各个模块
  • OhmoSessionRuntimePool: 会话运行时池,支持多用户并发
  • OhmoGatewayBridge: 桥接消息总线和会话运行时
  • ChannelManager: 管理各种 IM 渠道连接
6.4.2 支持的渠道
_SUPPORTED_CHANNELS = {
    "telegram": TelegramChannel,
    "slack": SlackChannel,
    "discord": DiscordChannel,
    "feishu": FeishuChannel,
}

目前支持 Telegram、Slack、Discord、飞书四个平台。

6.4.3 消息路由
# gateway/router.py
async def route_incoming_message(
    message: InboundMessage,
    session_pool: SessionPool,
) -> None:
    """Route an incoming message to the appropriate session."""
    
    # 根据 chat_id 查找或创建会话
    session_key = f"{message.channel}:{message.chat_id}"
    session = session_pool.get(session_key)
    
    if session is None:
        # 创建新会话
        session = await session_pool.create(
            key=session_key,
            channel=message.channel,
            chat_id=message.chat_id,
        )
    
    # 将消息添加到会话处理队列
    await session.handle_message(message)

路由逻辑:

  1. 根据渠道和聊天 ID 生成会话键
  2. 查找现有会话或创建新会话
  3. 将消息分发给会话处理

第七章:会话管理与记忆系统

7.1 会话存储

ohmo/session_storage.py 实现了会话持久化。

class OhmoSessionBackend:
    """File-based session storage for ohmo."""

    def __init__(self, workspace_root: Path) -> None:
        self._root = Path(workspace_root)
        self._sessions_dir = self._root / "sessions"
        self._sessions_dir.mkdir(parents=True, exist_ok=True)

    def _session_path(self, session_id: str) -> Path:
        """Return the file path for a session."""
        # 使用日期前缀组织文件,避免目录过大
        date_prefix = time.strftime("%Y%m")
        dir_path = self._sessions_dir / date_prefix
        dir_path.mkdir(parents=True, exist_ok=True)
        return dir_path / f"{session_id}.json"

    def save(self, session_id: str, data: dict[str, Any]) -> None:
        """Save session data to disk."""
        path = self._session_path(session_id)
        path.write_text(json.dumps(data, indent=2, default=str), encoding="utf-8")

    def load(self, session_id: str) -> dict[str, Any] | None:
        """Load session data from disk."""
        path = self._session_path(session_id)
        if not path.exists():
            return None
        return json.loads(path.read_text(encoding="utf-8"))

    def list_sessions(self) -> list[str]:
        """List all saved session IDs."""
        sessions = []
        for path in self._sessions_dir.rglob("*.json"):
            sessions.append(path.stem)
        return sessions

    def delete(self, session_id: str) -> bool:
        """Delete a saved session."""
        path = self._session_path(session_id)
        if not path.exists():
            return False
        path.unlink()
        return True

会话存储特点:

  • 文件存储:JSON 格式,便于查看和调试
  • 日期组织:按月份分子目录,避免单目录文件过多
  • 简单接口:save/load/list/delete 四个基本操作

7.2 会话恢复

async def resume_session(
    session_id: str,
    backend: OhmoSessionBackend,
    query_engine: QueryEngine,
) -> bool:
    """Resume a saved session."""
    data = backend.load(session_id)
    if data is None:
        return False
    
    # 恢复对话历史
    messages = [
        ConversationMessage.model_validate(msg)
        for msg in data.get("messages", [])
    ]
    query_engine.load_messages(messages)
    
    # 恢复工具元数据
    query_engine.tool_metadata.update(data.get("tool_metadata", {}))
    
    return True

会话恢复流程:

  1. 从文件加载会话数据
  2. 反序列化对话历史
  3. 恢复工具元数据(包括已读文件、目标等)
  4. 加载到 QueryEngine

这种设计让用户可以随时中断对话,之后从断点继续。


第八章:权限控制与安全机制

8.1 权限检查器

src/openharness/permissions/checker.py 实现了细粒度的权限控制。

@dataclass
class PermissionDecision:
    """Result of a permission check."""
    allowed: bool
    requires_confirmation: bool
    reason: str | None = None

class PermissionChecker:
    """Evaluates whether tool invocations are allowed."""

    def __init__(
        self,
        mode: PermissionMode = PermissionMode.DEFAULT,
        path_rules: list[PathRule] | None = None,
        denied_commands: list[str] | None = None,
    ):
        self._mode = mode
        self._path_rules = path_rules or []
        self._denied_commands = denied_commands or []

    def evaluate(
        self,
        tool_name: str,
        is_read_only: bool,
        file_path: str | None = None,
        command: str | None = None,
    ) -> PermissionDecision:
        """Evaluate whether a tool invocation is permitted."""
        
        # 1. 检查模式
        if self._mode == PermissionMode.UNRESTRICTED:
            return PermissionDecision(allowed=True, requires_confirmation=False)
        
        if self._mode == PermissionMode.READ_ONLY and not is_read_only:
            return PermissionDecision(
                allowed=False,
                requires_confirmation=False,
                reason="Read-only mode: write operations are not allowed",
            )
        
        # 2. 检查拒绝命令列表
        if command and any(dc in command for dc in self._denied_commands):
            return PermissionDecision(
                allowed=False,
                requires_confirmation=False,
                reason=f"Command contains denied pattern",
            )
        
        # 3. 检查路径规则
        if file_path:
            for rule in self._path_rules:
                if rule.matches(file_path):
                    if rule.action == "deny":
                        return PermissionDecision(
                            allowed=False,
                            requires_confirmation=False,
                            reason=f"Path blocked by rule: {rule.pattern}",
                        )
                    elif rule.action == "confirm":
                        return PermissionDecision(
                            allowed=True,
                            requires_confirmation=True,
                            reason=f"Path requires confirmation: {rule.pattern}",
                        )
        
        # 4. 根据工具和操作类型决定
        if is_read_only:
            # 只读操作默认允许
            return PermissionDecision(allowed=True, requires_confirmation=False)
        
        # 写操作需要确认(在非 unrestricted 模式下)
        return PermissionDecision(
            allowed=True,
            requires_confirmation=self._mode != PermissionMode.UNRESTRICTED,
            reason="Write operation requires confirmation",
        )

权限决策流程:

  1. 模式检查:unrestricted(无限制)、read-only(只读)、default(默认)
  2. 命令黑名单:检查是否包含危险命令
  3. 路径规则:根据文件路径匹配允许/拒绝/确认规则
  4. 操作类型:只读操作通常允许,写操作可能需要确认

8.2 路径规则

@dataclass
class PathRule:
    """A rule for controlling access to file paths."""
    pattern: str  # glob 或正则模式
    action: Literal["allow", "deny", "confirm"]
    pattern_type: Literal["glob", "regex"] = "glob"

    def matches(self, path: str) -> bool:
        if self.pattern_type == "glob":
            return fnmatch.fnmatch(path, self.pattern)
        else:
            return re.search(self.pattern, path) is not None

路径规则示例:

path_rules:
  - pattern: "*.secret"
    action: deny
  - pattern: "/etc/*"
    action: confirm
  - pattern: "*.md"
    action: allow

8.3 交互式确认

当权限检查返回 requires_confirmation=True 时,系统会调用 permission_prompt 回调:

# 在 TUI 中
async def prompt_permission(tool_name: str, reason: str) -> bool:
    # 显示确认对话框
    confirmed = await show_confirmation_dialog(
        title=f"Allow {tool_name}?",
        message=reason,
    )
    return confirmed

# 在 CLI 中
async def prompt_permission(tool_name: str, reason: str) -> bool:
    # 命令行交互
    response = input(f"Allow {tool_name}? ({reason}) [y/N]: ")
    return response.lower() in ("y", "yes")

第九章:CLI 与命令系统

9.1 CLI 架构

src/openharness/cli.py 是 2423 行的巨型文件,包含所有 CLI 命令。使用 typer 库构建命令行界面。

import typer

app = typer.Typer(name="oh")

# 注册子命令
@app.command()
def setup():
    """统一配置入口"""
    ...

@app.command()
def provider_list():
    """列出可用的 provider"""
    ...

@app.command()
def auth_status():
    """查看认证状态"""
    ...

# 带参数的命令
@app.command()
def run(
    prompt: str = typer.Option(None, "--prompt", "-p", help="Single prompt to run"),
    model: str = typer.Option(None, "--model", "-m", help="Model to use"),
    max_turns: int = typer.Option(None, "--max-turns", help="Maximum agent turns"),
):
    """Run OpenHarness."""
    ...

9.2 Dry-run 模式

OpenHarness 提供了 --dry-run 模式,可以预览配置而不实际执行:

@app.command()
def dry_run(
    prompt: str = typer.Option(None, "--prompt", "-p"),
    output_format: str = typer.Option("text", "--output-format"),
):
    """Preview what would happen without executing."""
    
    # 加载配置
    settings = load_settings()
    
    # 检查认证状态
    auth_status = check_auth_status(settings)
    
    # 解析提示
    if prompt:
        # 检测是否为 slash command
        if prompt.startswith("/"):
            command_name = prompt[1:].split()[0]
            command_info = get_command_info(command_name)
            
            issues = []
            if command_info is None:
                issues.append(f"Unknown command: {command_name}")
            else:
                # 分析命令影响
                if command_name in MUTATING_COMMANDS:
                    issues.append(f"Command '{command_name}' modifies local state")
        else:
            # 普通提示词
            # 检测可能用到的 skills 和 tools
            likely_skills = detect_likely_skills(prompt)
            likely_tools = detect_likely_tools(prompt)
    
    # 检查 MCP 配置
    mcp_status = validate_mcp_servers(settings.mcp_servers)
    
    # 生成报告
    report = {
        "status": "ready" if not issues else "warning" if len(issues) < 3 else "blocked",
        "auth": auth_status,
        "skills": likely_skills,
        "tools": likely_tools,
        "mcp": mcp_status,
        "issues": issues,
        "next_actions": generate_next_actions(auth_status, issues),
    }
    
    print(format_report(report, output_format))

Dry-run 输出示例:

Status: ready

Authentication:
  ✓ Anthropic API key configured

Likely skills:
  - python-debugging
  - git-workflow

Likely tools:
  - read_file
  - bash
  - grep

MCP servers:
  ✓ filesystem (stdio)
  ✗ fetch: command not found: curl

Issues:
  ⚠ MCP server 'fetch' is not properly configured

Next actions:
  1. Fix or disable the broken MCP server: oh mcp disable fetch
  2. Run the prompt: oh -p "Explain this codebase"

9.3 Provider 管理

@app.command()
def provider_add(
    name: str = typer.Argument(..., help="Profile name"),
    label: str = typer.Option(..., help="Display label"),
    provider: str = typer.Option(..., help="Provider type (anthropic/openai)"),
    api_format: str = typer.Option(..., help="API format"),
    auth_source: str = typer.Option(..., help="Authentication source"),
    model: str = typer.Option(..., help="Default model"),
    base_url: str | None = typer.Option(None, help="Custom base URL"),
):
    """Add a custom provider profile."""
    ...

@app.command()
def provider_use(profile: str = typer.Argument(..., help="Profile name to activate")):
    """Switch to a provider profile."""
    ...

第十章:架构设计哲学总结

10.1 核心设计原则

原则 1:分离模型与装备(可以参考我以前的文章)
Agent = Model + Harness

OpenHarness 的核心洞见是:模型(LLM)和装备(Harness)应该分离。

  • 模型负责:理解、推理、决策
  • 装备负责:工具执行、状态管理、错误处理、安全边界

这种分离带来:

  • 模型可替换:同一个装备可以搭配不同的模型
  • 行为可预测:装备提供确定性保证
  • 能力可扩展:通过插件扩展装备能力
原则 2:配置即代码

大量使用 Markdown 作为配置格式:

  • 人类可读
  • 版本友好
  • 易于编辑
# SOUL.md

## Core truths
- Be genuinely helpful
- Have judgment
- Be resourceful before asking
原则 3:分层解耦
Brain → Hands → Session

每层可以独立失败、恢复、扩展。

原则 4:流式优先

所有操作都是异步流式的:

  • UI 保持响应
  • 支持实时反馈
  • 易于取消

10.2 代码量分析

OpenHarness 如何实现 11,733 行代码复刻 Claude Code 98% 功能?

精简策略:

  1. 只做核心循环:Agent loop、工具执行、状态管理
  2. 插件化扩展:Skills、Plugins、MCP 协议支持外部扩展
  3. 复用生态:React TUI、Pydantic、typer 等成熟库
  4. 配置驱动:人格、记忆、技能都用配置而非代码

对比 Claude Code:

组件 OpenHarness Claude Code
核心引擎 ~2000 行 ~50000 行
工具系统 ~3000 行 ~80000 行
UI React + Ink 自建
集成 通过 MCP 内置大量集成

OpenHarness 通过 MCP 协议 将集成外包,保持核心精简。

10.3 适用场景

OpenHarness 适合:

  • 需要深度定制的场景
  • 私有化部署需求
  • 多模型切换需求
  • 学习和研究目的

Claude Code 适合:

  • 追求开箱即用
  • 信任官方托管
  • 不需要深度定制

10.4 未来展望

OpenHarness 的设计为未来的扩展预留了空间:

  1. 更多渠道:通过 Gateway 架构接入更多 IM 平台
  2. 更多模型:通过统一 API 客户端支持新模型
  3. 更多工具:通过 MCP 和插件系统扩展
  4. 更强记忆:通过记忆系统实现长期上下文

结语

通过对 OpenHarness 11733 行代码的逐行分析,我们看到了一个精心设计的 AI Agent 框架。它的成功不在于功能的堆砌,而在于:

  1. 清晰的架构:模型与装备分离,分层解耦
  2. 精简的核心:只做最必要的事,其他交给插件
  3. 配置驱动:用 Markdown 实现灵活定制
  4. 流式设计:异步优先,保持响应

这种设计哲学值得所有 AI Agent 开发者学习。

项目地址:https://github.com/HKUDS/OpenHarness


参考链接:

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐