这是第二篇文章,第三篇文章在我的blog:https://www.catchcat.eu.cc/post/claude-code%E6%9E%B6%E6%9E%84%E6%B7%B1%E5%BA%A6%E8%A7%A3%E8%AF%BB%E4%B8%89

s06: Context Compact (上下文压缩)(Harness 层: 压缩 – 干净的记忆, 无限的会话。)

问题

上下文窗口是有限的。读一个 1000 行的文件就吃掉 ~4000 token; 读 30 个文件、跑 20 条命令, 轻松突破 100k token。不压缩, Agent 根本没法在大项目里干活。

解决方案

三层压缩, 激进程度递增:

Every turn:
+------------------+
| Tool call result |
+------------------+
        |
        v
[Layer 1: micro_compact]        (silent, every turn)
  Replace tool_result > 3 turns old
  with "[Previous: used {tool_name}]"
        |
        v
[Check: tokens > 50000?]
   |               |
   no              yes
   |               |
   v               v
continue    [Layer 2: auto_compact]
              Save transcript to .transcripts/
              LLM summarizes conversation.
              Replace all messages with [summary].
                    |
                    v
            [Layer 3: compact tool]
              Model calls compact explicitly.
              Same summarization as auto_compact.

Layer1:micro_compact每轮必执行):保留最近 3 个工具结果,非 read_file 的旧结果替换为 [Previous: used 工具名]

Layer2:auto_compact:token>50000 自动总结(本次指令 token 不足,不触发)

Layer3:compact 工具:手动压缩(一般不给大模型说就不触发)

相对 s05 的变更

组件 之前 (s05) 之后 (s06)
Tools 5 5 (基础 + compact)
上下文管理 三层压缩
Micro-compact 旧结果 -> 占位符
Auto-compact token 阈值触发
Transcripts 保存到 .transcripts/

阶段 1:用户输入指令 → history = 1 条(初始状态)

[
  {
    "role": "user",
    "content": "逐一阅读agents/目录中的每个Python文件"
  }
]

阶段 2:进入 agent_loop → 执行 Layer1 micro_compact

代码执行:

micro_compact(messages)  # 此时只有1条消息,无tool_result → 不做任何修改

messages保持不变,token<50000 → 不触发 auto_compact


阶段 3:调用大模型 → 返回 tool_use(bash 列出文件)

大模型返回 stop_reason: tool_use,代码追加助手消息history = 2 条

[
  {
    "role": "user",
    "content": "逐一阅读agents/目录中的每个Python文件"
  },
  {
    "role": "assistant",
    "content": [
      {
        "type": "tool_use",
        "id": "tu_001",
        "name": "bash",
        "input": {
          "command": "ls agents/*.py"
        }
      }
    ]
  }
]

阶段 4:执行 bash 工具 → 追加 tool_resulthistory = 3 条

执行 run_bash,返回文件列表,封装结果追加:

[
  {
    "role": "user",
    "content": "逐一阅读agents/目录中的每个Python文件"
  },
  {
    "role": "assistant",
    "content": [{"type":"tool_use","id":"tu_001","name":"bash","input":{"command":"ls agents/*.py"}}]
  },
  {
    "role": "user",
    "content": [
      {
        "type": "tool_result",
        "tool_use_id": "tu_001",
        "content": "s05_agent.py\ns06_context_compact.py"
      }
    ]
  }
]

执行micro_compact:工具结果数量 = 1 < KEEP_RECENT=3 → 不压缩


阶段 5:第二轮循环 → LLM 调用 read_file 读第一个文件

追加助手消息 → history = 4 条

[
  // 前3条省略
  {
    "role": "assistant",
    "content": [
      {
        "type": "tool_use",
        "id": "tu_002",
        "name": "read_file",
        "input": {
          "path": "s05_agent.py"
        }
      }
    ]
  }
]

阶段 6:执行 read_file → 追加结果 → history = 5 条

read_file 受保护,不压缩,返回文件内容:

[
  // 前4条省略
  {
    "role": "user",
    "content": [
      {
        "type": "tool_result",
        "tool_use_id": "tu_002",
        "content": "import os\nimport re\n...(s05_agent.py完整代码)"
      }
    ]
  }
]

micro_compact:结果数量 = 2 < 3 → 不压缩


阶段 7:第三轮循环 → LLM 调用 read_file 读第二个文件

追加助手消息 → history = 6 条

[
  // 前5条省略
  {
    "role": "assistant",
    "content": [
      {
        "type": "tool_use",
        "id": "tu_003",
        "name": "read_file",
        "input": {
          "path": "s06_context_compact.py"
        }
      }
    ]
  }
]

阶段 8:执行 read_file → 追加结果 → history = 7 条

[
  // 前6条省略
  {
    "role": "user",
    "content": [
      {
        "type": "tool_result",
        "tool_use_id": "tu_003",
        "content": "import json\nimport os\n...(s06_context_compact.py完整代码)"
      }
    ]
  }
]

阶段 9:micro_compact 压缩逻辑触发演示(这个很关键)

当前工具结果列表:

  1. tu_001: bash(非保护工具)
  2. tu_002: read_file(保护)
  3. tu_003: read_file(保护)

总数量 = 3 = KEEP_RECENT=3 → 刚好不压缩

模拟:再多执行 1 个工具(达到 4 个,触发压缩)

如果再执行 1 个工具(tu_004: bash),工具结果 = 4,micro_compact自动替换最早的非保护结果

// 被压缩后的旧结果(tu_001 bash)
{
  "type": "tool_result",
  "tool_use_id": "tu_001",
  "content": "[Previous: used bash]"  // 被微压缩替换!
}

read_file 结果永久保留,不会被压缩


阶段 10:LLM 结束任务 → 最终 history

大模型返回 end_turn,追加文本回答,任务完成:

[
  // 前7条省略
  {
    "role": "assistant",
    "content": "已成功逐一阅读agents/目录下的所有Python文件:\n1. s05_agent.py(基础Agent)\n2. s06_context_compact.py(三层压缩Agent)"
  }
]

压缩过程也就是这个代码

THRESHOLD = 50000
TRANSCRIPT_DIR = WORKDIR / ".transcripts"
KEEP_RECENT = 3
PRESERVE_RESULT_TOOLS = {"read_file"}


def estimate_tokens(messages: list) -> int:
    """Rough token count: ~4 chars per token."""
    return len(str(messages)) // 4


# -- Layer 1: micro_compact - replace old tool results with placeholders --
def micro_compact(messages: list) -> list:
    # Collect (msg_index, part_index, tool_result_dict) for all tool_result entries
    tool_results = []
    for msg_idx, msg in enumerate(messages):
        if msg["role"] == "user" and isinstance(msg.get("content"), list):
            for part_idx, part in enumerate(msg["content"]):
                if isinstance(part, dict) and part.get("type") == "tool_result":
                    tool_results.append((msg_idx, part_idx, part))
    if len(tool_results) <= KEEP_RECENT:
        return messages
    # Find tool_name for each result by matching tool_use_id in prior assistant messages
    tool_name_map = {}
    for msg in messages:
        if msg["role"] == "assistant":
            content = msg.get("content", [])
            if isinstance(content, list):
                for block in content:
                    if hasattr(block, "type") and block.type == "tool_use":
                        tool_name_map[block.id] = block.name
    # Clear old results (keep last KEEP_RECENT). Preserve read_file outputs because
    # they are reference material; compacting them forces the agent to re-read files.
    to_clear = tool_results[:-KEEP_RECENT]
    for _, _, result in to_clear:
        if not isinstance(result.get("content"), str) or len(result["content"]) <= 100:
            continue
        tool_id = result.get("tool_use_id", "")
        tool_name = tool_name_map.get(tool_id, "unknown")
        if tool_name in PRESERVE_RESULT_TOOLS:
            continue
        result["content"] = f"[Previous: used {tool_name}]"
    return messages


# -- Layer 2: auto_compact - save transcript, summarize, replace messages --
def auto_compact(messages: list) -> list:
    # Save full transcript to disk
    TRANSCRIPT_DIR.mkdir(exist_ok=True)
    transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
    with open(transcript_path, "w") as f:
        for msg in messages:
            f.write(json.dumps(msg, default=str) + "\n")
    print(f"[transcript saved: {transcript_path}]")
    # Ask LLM to summarize
    conversation_text = json.dumps(messages, default=str)[-80000:]
    response = client.messages.create(
        model=MODEL,
        messages=[{"role": "user", "content":
            "Summarize this conversation for continuity. Include: "
            "1) What was accomplished, 2) Current state, 3) Key decisions made. "
            "Be concise but preserve critical details.\n\n" + conversation_text}],
        max_tokens=2000,
    )
    summary = next((block.text for block in response.content if hasattr(block, "text")), "")
    if not summary:
        summary = "No summary generated."
    # Replace all messages with compressed summary
    return [
        {"role": "user", "content": f"[Conversation compressed. Transcript: {transcript_path}]\n\n{summary}"},
    ]

这里顺便解释一下enumerate这个函数(秒懂没!!!)(给 enumerate 传一个列表,遍历 / 转换后,每一项都是 (索引,元素) 的元组)

my_list = ["张三", "李四", "王五"]

result = enumerate(my_list) 

print(result)

结果:
[(0, '张三'), (1, '李四'), (2, '王五')]

整个压缩解读

这 4 个变量定义了压缩的所有规则,是整个逻辑的核心:

# 1. Token阈值:超过50000 token,触发【深度自动压缩】
THRESHOLD = 50000
# 2. 对话存档文件夹:压缩前把完整对话存到这里,防止数据丢失
TRANSCRIPT_DIR = WORKDIR / ".transcripts"
# 3. 微压缩保留数量:只保留最近3个工具结果,更早的直接压缩
KEEP_RECENT = 3
# 4. 保护工具:read_file的结果永远不压缩(读文件的内容必须保留)
PRESERVE_RESULT_TOOLS = {"read_file"}

辅助函数:estimate_tokens(计算对话长度)

作用:粗略估算当前对话的 token 数量,判断是否触发深度压缩

def estimate_tokens(messages: list) -> int:
    # 规则:1个token ≈ 4个英文字符,中文/符号也按这个粗略算,其实一个token对应的是1.3-1.5个汉字,这里粗滤结算
    return len(str(messages)) // 4

作用:给代码一个「判断依据」—— 对话太长(>50000)就压缩。


核心 1:Layer1 微压缩 micro_compact(每轮必执行!轻量级)

核心作用

不删除消息,只把「旧的、非读文件的工具结果」替换成简短占位符

  • 保留:最近 3 个工具结果 + 所有 read_file 结果
  • 压缩:更早的 bash/write/edit 工具结果 → 替换为 [Previous: used 工具名]

逐行代码拆解 + JSON 示例

第一步:收集所有「工具结果(tool_result)

tool_results = []
for msg_idx, msg in enumerate(messages):
    # 只找user角色、content是列表的消息
    if msg["role"] == "user" and isinstance(msg.get("content"), list):
        for part_idx, part in enumerate(msg["content"]):
            # 只收集 type="tool_result" 的工具结果
            if isinstance(part, dict) and part.get("type") == "tool_result":
                tool_results.append((消息索引, 内容索引, 工具结果字典))
【示例】收集到的工具结果(共 4 个,超过 3 个,触发压缩)
  1. tu_001:bash 结果
  2. tu_002:read_file 结果
  3. tu_003:write_file 结果
  4. tu_004:read_file 结果

第二步:建立「工具 ID → 工具名」映射

tool_name_map = {}
for msg in messages:
    if msg["role"] == "assistant":
        for block in content:
            if block.type == "tool_use":
                # 记录:工具ID → 工具名(如 tu_001 → bash)
                tool_name_map[block.id] = block.name

作用:知道每个工具结果对应的工具名,方便替换占位符。


第三步:执行压缩(核心!)

# 只保留最后3个,前面的全部标记为「待压缩」
to_clear = tool_results[:-KEEP_RECENT]

for _, _, result in to_clear:
    # 跳过:内容很短的结果 / 保护工具(read_file)的结果
    if 内容不是字符串 或 内容长度≤100: continue
    if 工具名是read_file: continue
    # 替换!核心压缩逻辑
    result["content"] = f"[Previous: used {tool_name}]"

微压缩 JSON 前后对比(最直观)

压缩前(原始长文本,占空间)

{
  "type": "tool_result",
  "tool_use_id": "tu_001",
  "content": "s05_agent.py\ns06_context_compact.py"  // 原始bash结果
}

压缩后(替换为占位符,极省空间)

{
  "type": "tool_result",
  "tool_use_id": "tu_001",
  "content": "[Previous: used bash]"  // 微压缩完成!
}

关键保护:read_file 结果完全不变(这个很重要,读的内容必须保护)

{
  "type": "tool_result",
  "tool_use_id": "tu_002",
  "content": "import os\nimport re\n...(完整文件代码)"  // 永久保留,不压缩
}

核心 2:Layer2 深度自动压缩 auto_compact(超 50000token 触发)

核心作用

对话太长时,直接把「整个对话」替换成「1 条 AI 总结」,实现无限会话

三步必执行:

  1. 存档:把完整对话保存到 .transcripts 文件夹(防止丢数据)
  2. 总结:让大模型把整个对话浓缩成一段文字
  3. 替换:把几十条消息 → 替换成1 条总结消息

逐行代码拆解 + JSON 示例(这个更直观!!看这里来了!!!)

第一步:存档完整对话

# 创建文件夹,把所有消息保存为jsonl文件
TRANSCRIPT_DIR.mkdir(exist_ok=True)
with open(transcript_时间戳.jsonl, "w") as f:
    f.write(完整对话json)

作用:压缩后也能找回原始对话。


第二步:让 AI 生成总结

# 给AI发指令:总结对话的成果、当前状态、关键决策
response = client.messages.create(
    prompt="总结对话,保留关键信息",
    messages=对话内容
)
summary = AI返回的总结文本

第三步:替换整个对话(终极压缩)

# 把N条消息 → 直接变成1条总结消息
return [
    {"role": "user", "content": f"[对话已压缩]\n\n{summary}"},
]

深度压缩 JSON 前后对比

压缩前(几十条消息,超长)

[
  {"role":"user","content":"逐一阅读agents目录文件"},
  {"role":"assistant","content":[{"type":"tool_use","name":"bash"}]},
  {"role":"user","content":[{"type":"tool_result","content":"[Previous: used bash]"}]},
  {"role":"assistant","content":[{"type":"tool_use","name":"read_file"}]},
  ... 省略10+条消息
]

压缩后(只剩 1 条消息,极小)

[
  {
    "role": "user",
    "content": "[Conversation compressed. Transcript: .transcripts/transcript_123456.jsonl]\n\n总结:已完成agents目录下2个Python文件的阅读,s05是基础Agent,s06是三层压缩Agent,无错误执行完成。"
  }
]

s07: Task System (任务系统)(Harness 层: 持久化任务 – 比任何一次对话都长命的目标)

问题

s03 的 TodoManager 只是内存中的扁平清单: 没有顺序、没有依赖、状态只有做完没做完。真实目标是有结构的 – 任务 B 依赖任务 A, 任务 C 和 D 可以并行, 任务 E 要等 C 和 D 都完成。

没有显式的关系, Agent 分不清什么能做、什么被卡住、什么能同时跑。而且清单只活在内存里, 上下文压缩 (s06) 一跑就没了。

解决方案

把扁平清单升级为持久化到磁盘的任务图。每个任务是一个 JSON 文件, 有状态、前置依赖 (blockedBy)。任务图随时回答三个问题:

  • 什么可以做? – 状态为 pendingblockedBy 为空的任务。

  • 什么被卡住? – 等待前置任务完成的任务。

  • 什么做完了? – 状态为 completed 的任务, 完成时自动解锁后续任务。

    其实就是s03的升级,之前是把所有的任务都串在一起执行,这里直接把有些任务并起来执行

.tasks/
  task_1.json  {"id":1, "status":"completed"}
  task_2.json  {"id":2, "blockedBy":[1], "status":"pending"}
  task_3.json  {"id":3, "blockedBy":[1], "status":"pending"}
  task_4.json  {"id":4, "blockedBy":[2,3], "status":"pending"}

任务图 (DAG):
                 +----------+
            +--> | task 2   | --+
            |    | pending  |   |
+----------+     +----------+    +--> +----------+
| task 1   |                          | task 4   |
| completed| --> +----------+    +--> | blocked  |
+----------+     | task 3   | --+     +----------+
                 | pending  |
                 +----------+

顺序:   task 1 必须先完成, 才能开始 2 和 3
并行:   task 2 和 3 可以同时执行
依赖:   task 4 要等 2 和 3 都完成
状态:   pending -> in_progress -> completed

这个任务图是 s07 之后所有机制的协调骨架: 后台执行 (s08)、多 agent 团队 (s09+)、worktree 隔离 (s12) 都读写这同一个结构。

核心代码:
class TaskManager:
    def __init__(self, tasks_dir: Path):
        self.dir = tasks_dir
        self.dir.mkdir(exist_ok=True)
        self._next_id = self._max_id() + 1

    def _max_id(self) -> int:
        ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")]
        return max(ids) if ids else 0

    def _load(self, task_id: int) -> dict:
        path = self.dir / f"task_{task_id}.json"
        if not path.exists():
            raise ValueError(f"Task {task_id} not found")
        return json.loads(path.read_text())

    def _save(self, task: dict):
        path = self.dir / f"task_{task['id']}.json"
        path.write_text(json.dumps(task, indent=2, ensure_ascii=False))

    def create(self, subject: str, description: str = "") -> str:
        task = {
            "id": self._next_id, "subject": subject, "description": description,
            "status": "pending", "blockedBy": [], "owner": "",
        }
        self._save(task)
        self._next_id += 1
        return json.dumps(task, indent=2, ensure_ascii=False)

    def get(self, task_id: int) -> str:
        return json.dumps(self._load(task_id), indent=2, ensure_ascii=False)

    def update(self, task_id: int, status: str = None,
               add_blocked_by: list = None, remove_blocked_by: list = None) -> str:
        task = self._load(task_id)
        if status:
            if status not in ("pending", "in_progress", "completed"):
                raise ValueError(f"Invalid status: {status}")
            task["status"] = status
            if status == "completed":
                self._clear_dependency(task_id)
        if add_blocked_by:
            task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
        if remove_blocked_by:
            task["blockedBy"] = [x for x in task["blockedBy"] if x not in remove_blocked_by]
        self._save(task)
        return json.dumps(task, indent=2, ensure_ascii=False)

    def _clear_dependency(self, completed_id: int):
        """Remove completed_id from all other tasks' blockedBy lists."""
        for f in self.dir.glob("task_*.json"):
            task = json.loads(f.read_text())
            if completed_id in task.get("blockedBy", []):
                task["blockedBy"].remove(completed_id)
                self._save(task)

    def list_all(self) -> str:
        tasks = []
        files = sorted(
            self.dir.glob("task_*.json"),
            key=lambda f: int(f.stem.split("_")[1])
        )
        for f in files:
            tasks.append(json.loads(f.read_text()))
        if not tasks:
            return "No tasks."
        lines = []
        for t in tasks:
            marker = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
            blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""
            lines.append(f"{marker} #{t['id']}: {t['subject']}{blocked}")
        return "\n".join(lines)


TASKS = TaskManager(TASKS_DIR)

我输入:

创建3个任务:“设置项目”、“编写代码”、“编写测试”。按顺序相互依赖

大模型会自动调用 5 个工具(严格匹配代码的 TOOL_HANDLERS):

  1. task_create(设置项目)
  2. task_create(编写代码)
  3. task_create(编写测试)
  4. task_update(任务2, 添加依赖1)
  5. task_update(任务3, 添加依赖2)

一、核心方法 1:task_create(创建任务)

对应代码

def create(self, subject: str, description: str = "") -> str:
    # 1. 生成任务字典(固定字段)
    task = {
        "id": self._next_id, "subject": subject, "description": description,
        "status": "pending", "blockedBy": [], "owner": "",
    }
    # 2. 写入JSON文件
    self._save(task)
    # 3. 自增ID,下次创建用
    self._next_id += 1
    # 4. 返回完整JSON
    return json.dumps(task, indent=2, ensure_ascii=False)

执行动作(对应我的 3 个任务)

  1. 创建任务 1

    传入

    subject=设置项目
    
    • 生成 id=1blockedBy=[],写入 task_1.json
  2. 创建任务 2

    传入

    subject=编写代码
    
    • 生成 id=2blockedBy=[],写入 task_2.json
  3. 创建任务 3

    传入

    subject=编写测试
    
    • 生成 id=3blockedBy=[],写入 task_3.json

结果

生成 3 个空依赖的 JSON 文件,这是创建的底层逻辑

生成文件:.tasks/task_1.json

{
  "id": 1,
  "subject": "设置项目",
  "description": "",
  "status": "pending",
  "blockedBy": [],
  "owner": ""
}

生成文件:.tasks/task_2.json

{
  "id": 2,
  "subject": "编写代码",
  "description": "",
  "status": "pending",
  "blockedBy": [],
  "owner": ""
}

生成文件:.tasks/task_3.json

{
  "id": 3,
  "subject": "编写测试",
  "description": "",
  "status": "pending",
  "blockedBy": [],
  "owner": ""
}

二、核心方法 2:task_update(修改依赖 / 状态)

对应代码

def update(self, task_id: int, status=None, add_blocked_by=None, remove_blocked_by=None):
    # 1. 加载要修改的任务JSON
    task = self._load(task_id)
    # 2. 添加依赖:合并列表+去重
    if add_blocked_by:
        task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
    # 3. 保存修改后的JSON
    self._save(task)
    return json.dumps(task)

执行动作(配置顺序依赖)

  1. 修改任务 2

    :调用

    update(task_id=2, add_blocked_by=[1])
    
    • 加载 task_2.json
    • blockedBy: []合并成 [1]
    • 覆盖写入文件
  2. 修改任务 3

    :调用

    update(task_id=3, add_blocked_by=[2])
    
    • 加载 task_3.json
    • blockedBy: []合并成 [2]
    • 覆盖写入文件

结果

任务 2 依赖 1,任务 3 依赖 2,这是依赖配置的底层逻辑

修改文件:.tasks/task_2.json

完整最终 JSON(blockedBy 从 [] → [1])

{
  "id": 2,
  "subject": "编写代码",
  "description": "",
  "status": "pending",
  "blockedBy": [
    1
  ],
  "owner": ""
}

修改文件:.tasks/task_3.json

完整最终 JSON(blockedBy 从 [] → [2])

{
  "id": 3,
  "subject": "编写测试",
  "description": "",
  "status": "pending",
  "blockedBy": [
    2
  ],
  "owner": ""
}
.tasks/task_1.json(无变化)
{
  "id": 1,
  "subject": "设置项目",
  "description": "",
  "status": "pending",
  "blockedBy": [],
  "owner": ""
}

三、核心方法 3:task_list(列出所有任务)

对应代码(这是输出格式的唯一来源!)

def list_all(self) -> str:
    # 1. 遍历.tasks/下所有task_*.json,按ID排序
    files = sorted(self.dir.glob("task_*.json"), key=lambda f: int(f.stem.split("_")[1]))
    # 2. 读取所有任务
    for f in files:
        task = json.loads(f.read_text())
    # 3. 按固定格式拼接每行输出(核心!)
    marker = {"pending":"[ ]", "in_progress":"[>]", "completed":"[x]"}[task["status"]]
    blocked = f" (blocked by: {task['blockedBy']})" if task["blockedBy"] else ""
    lines.append(f"{marker} #{task['id']}: {task['subject']}{blocked}")
    # 4. 返回拼接好的字符串
    return "\n".join(lines)

执行动作

请添加图片描述

结果

格式是代码硬编码拼接的。!!!!!!!!

[ ] #1: 设置项目
[ ] #2: 编写代码 (blocked by: [1])
[ ] #3: 编写测试 (blocked by: [2])

四、核心方法 4:task_get(查询单个任务详情)

对应代码

def get(self, task_id: int) -> str:
    # 直接加载对应JSON,返回完整字符串
    return json.dumps(self._load(task_id), indent=2)

执行动作

调用 task_get(1) → 直接读取 task_1.json 并返回完整 JSON。


五、完整闭环:从指令 → 最终输出(一步一对应)

初始状态
.tasks/` 目录为空,`_next_id=1
第 1 步:调用 task_create (设置项目)
  • 代码生成 id=1 任务 → 写入 task_1.json
  • _next_id 变成 2
第 2 步:调用 task_create (编写代码)
  • 代码生成 id=2 任务 → 写入 task_2.json
  • _next_id 变成 3
第 3 步:调用 task_create (编写测试)
  • 代码生成 id=3 任务 → 写入 task_3.json
  • _next_id 变成 4
第 4 步:调用 task_update (2, add_blocked_by=[1])
  • 代码修改 task_2.jsonblockedBy: [1]
第 5 步:调用 task_update (3, add_blocked_by=[2])
  • 代码修改 task_3.jsonblockedBy: [2]
第 6 步:调用 task_list
[ ] #1: 设置项目
[ ] #2: 编写代码 (blocked by: [1])
[ ] #3: 编写测试 (blocked by: [2])

s08: Background Tasks (后台任务)(Harness 层: 后台执行 – 模型继续思考, harness 负责等待)

问题

有些命令要跑好几分钟: npm installpytestdocker build。阻塞式循环下模型只能干等。用户说 “装依赖, 顺便建个配置文件”, Agent 却只能一个一个来。

解决方案

Main thread                Background thread
+-----------------+        +-----------------+
| agent loop      |        | subprocess runs |
| ...             |        | ...             |
| [LLM call] <---+------- | enqueue(result) |
|  ^drain queue   |        +-----------------+
+-----------------+

Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
             |          |
             v          v
          [A runs]   [B runs]      (parallel)
             |          |
             +-- results injected before next LLM call --+

相对 s07 的变更

组件 之前 (s07) 之后 (s08)
Tools 8 6 (基础 + background_run + check)
执行方式 仅阻塞 阻塞 + 后台线程
通知机制 每轮排空的队列
并发 守护线程

关键代码:

class MessageBus:
    def __init__(self, inbox_dir: Path):
        self.dir = inbox_dir
        self.dir.mkdir(parents=True, exist_ok=True)

    def send(self, sender: str, to: str, content: str,
             msg_type: str = "message", extra: dict = None) -> str:
        if msg_type not in VALID_MSG_TYPES:
            return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}"
        msg = {
            "type": msg_type,
            "from": sender,
            "content": content,
            "timestamp": time.time(),
        }
        if extra:
            msg.update(extra)
        inbox_path = self.dir / f"{to}.jsonl"
        with open(inbox_path, "a") as f:
            f.write(json.dumps(msg) + "\n")
        return f"Sent {msg_type} to {to}"

    def read_inbox(self, name: str) -> list:
        inbox_path = self.dir / f"{name}.jsonl"
        if not inbox_path.exists():
            return []
        messages = []
        for line in inbox_path.read_text().strip().splitlines():
            if line:
                messages.append(json.loads(line))
        inbox_path.write_text("")
        return messages

    def broadcast(self, sender: str, content: str, teammates: list) -> str:
        count = 0
        for name in teammates:
            if name != sender:
                self.send(sender, name, content, "broadcast")
                count += 1
        return f"Broadcast to {count} teammates"


BUS = MessageBus(INBOX_DIR)

我输入指令:

在后台运行 "sleep 5 && echo done",同时创建一个 test.txt 文件,内容为 hello

history 初始 = 1 条(只有我的提问)

[
  {
    "role": "user",
    "content": "在后台运行 \"sleep 5 && echo done\",同时创建一个 test.txt 文件,内容为 hello"
  }
]

阶段 1:AI 调用 background_run 工具(后台执行 sleep 命令)

追加助手消息 → history = 2 条

[
  {
    "role": "user",
    "content": "在后台运行 \"sleep 5 && echo done\",同时创建一个 test.txt 文件,内容为 hello"
  },
  {
    "role": "assistant",
    "content": [
      {
        "type": "tool_use",
        "id": "tu_001",
        "name": "background_run",
        "input": {
          "command": "sleep 5 && echo done"
        }
      }
    ]
  }
]

BackgroundManager 数据变化(核心)

# tasks 记录本:新增任务,状态=运行中
self.tasks = {
    "8f2d9c7a": {
        "status": "running",
        "result": None,
        "command": "sleep 5 && echo done"
    }
}
# 消息箱:空
self._notification_queue = []

阶段 2:执行 background_run → 追加 tool_result → history = 3 条

返回后台任务启动提示,封装结果追加:

[
  {
    "role": "user",
    "content": "在后台运行 \"sleep 5 && echo done\",同时创建一个 test.txt 文件,内容为 hello"
  },
  {
    "role": "assistant",
    "content": [
      {
        "type": "tool_use",
        "id": "tu_001",
        "name": "background_run",
        "input": {
          "command": "sleep 5 && echo done"
        }
      }
    ]
  },
  {
    "role": "user",
    "content": [
      {
        "type": "tool_result",
        "tool_use_id": "tu_001",
        "content": "Background task 8f2d9c7a started: sleep 5 && echo done"
      }
    ]
  }
]

BackgroundManager 数据变化

无变化:任务仍在 running,消息箱为空

关键:主线程不卡顿!立刻可以执行创建文件的操作


阶段 3:AI 调用 write_file 工具(创建文件)

追加助手消息 → history = 4 条

[
  // 前3条省略
  {
    "role": "assistant",
    "content": [
      {
        "type": "tool_use",
        "id": "tu_002",
        "name": "write_file",
        "input": {
          "path": "test.txt",
          "content": "hello"
        }
      }
    ]
  }
]

阶段 4:执行 write_file → 追加 tool_result → history = 5 条

[
  // 前4条省略
  {
    "role": "user",
    "content": [
      {
        "type": "tool_result",
        "tool_use_id": "tu_002",
        "content": "Wrote 5 bytes"
      }
    ]
  }
]

此时状态

  1. test.txt 文件已创建完成
  2. 后台任务 8f2d9c7a 仍在 sleep 5 秒(running)
  3. BackgroundManager 消息箱依旧为空

阶段 5:5 秒后 → 后台任务自动完成

后台线程执行完毕,BackgroundManager 内部数据突变

# tasks 记录本:状态改为 completed,写入结果
self.tasks = {
    "8f2d9c7a": {
        "status": "completed",
        "result": "done",
        "command": "sleep 5 && echo done"
    }
}
# 消息箱:放入完成通知
self._notification_queue = [
    {
        "task_id": "8f2d9c7a",
        "status": "completed",
        "command": "sleep 5 && echo done",
        "result": "done"
    }
]

阶段 6:AI 第二轮循环 → drain 通知 → 追加背景结果消息 → history = 6 条

AI 调用 LLM 前,会把后台任务结果作为用户消息追加到历史:

[
  // 前5条省略
  {
    "role": "user",
    "content": "<background-results>\n[bg:8f2d9c7a] completed: done\n</background-results>"
  }
]

BackgroundManager 数据变化

消息箱被清空:self._notification_queue = []


阶段 7:LLM 接收通知 → 返回助手最终消息 → history = 7 条

AI 告知我后台任务完成,最终对话历史:

[
  // 前6条省略
  {
    "role": "assistant",
    "content": "后台任务已完成:sleep 5 && echo done,输出结果:done\n已成功创建文件 test.txt,内容为 hello"
  }
]

最终完整 history JSON

[
  {
    "role": "user",
    "content": "在后台运行 \"sleep 5 && echo done\",同时创建一个 test.txt 文件,内容为 hello"
  },
  {
    "role": "assistant",
    "content": [
      {
        "type": "tool_use",
        "id": "tu_001",
        "name": "background_run",
        "input": {
          "command": "sleep 5 && echo done"
        }
      }
    ]
  },
  {
    "role": "user",
    "content": [
      {
        "type": "tool_result",
        "tool_use_id": "tu_001",
        "content": "Background task 8f2d9c7a started: sleep 5 && echo done"
      }
    ]
  },
  {
    "role": "assistant",
    "content": [
      {
        "type": "tool_use",
        "id": "tu_002",
        "name": "write_file",
        "input": {
          "path": "test.txt",
          "content": "hello"
        }
      }
    ]
  },
  {
    "role": "user",
    "content": [
      {
        "type": "tool_result",
        "tool_use_id": "tu_002",
        "content": "Wrote 5 bytes"
      }
    ]
  },
  {
    "role": "user",
    "content": "<background-results>\n[bg:8f2d9c7a] completed: done\n</background-results>"
  },
  {
    "role": "assistant",
    "content": "后台任务已完成:sleep 5 && echo done,输出结果:done\n已成功创建文件 test.txt,内容为 hello"
  }
]

s09: Agent Teams (Agent 团队)(Harness 层: 团队邮箱 – 多个模型, 通过文件协调)

问题

Subagent (s04) 是一次性的: 生成、干活、返回摘要、消亡。没有身份, 没有跨调用的记忆。Background Tasks (s08) 能跑 shell 命令, 但做不了 LLM 引导的决策。

真正的团队协作需要三样东西: (1) 能跨多轮对话存活的持久 Agent, (2) 身份和生命周期管理, (3) Agent 之间的通信通道。

解决方案

Teammate lifecycle:
  spawn -> WORKING -> IDLE -> WORKING -> ... -> SHUTDOWN

Communication:
  .team/
    config.json           <- team roster + statuses
    inbox/
      alice.jsonl         <- append-only, drain-on-read
      bob.jsonl
      lead.jsonl

              +--------+    send("alice","bob","...")    +--------+
              | alice  | -----------------------------> |  bob   |
              | loop   |    bob.jsonl << {json_line}    |  loop  |
              +--------+                                +--------+
                   ^                                         |
                   |        BUS.read_inbox("alice")          |
                   +---- alice.jsonl -> read + drain ---------+

相对 s08 的变更

组件 之前 (s08) 之后 (s09)
Tools 6 9 (+spawn/send/read_inbox)
Agent 数量 单一 领导 + N 个队友
持久化 config.json + JSONL 收件箱
线程 后台命令 每线程完整 agent loop
生命周期 一次性 idle -> working -> idle
通信 message + broadcast

输入指令

生成 Alice(编码者)和 Bob(测试者)。让 Alice 给 Bob 发送一条消息

初始状态

history = 1 条(仅我的提问)

[
  {
    "role": "user",
    "content": "生成Alice(编码者)和Bob(测试者)。让Alice给Bob发送一条消息"
  }
]

阶段 1:AI 调用 spawn_teammate 工具(创建 Alice)

追加助手消息 → history = 2 条

[
  {
    "role": "user",
    "content": "生成Alice(编码者)和Bob(测试者)。让Alice给Bob发送一条消息"
  },
  {
    "role": "assistant",
    "content": [
      {
        "type": "tool_use",
        "id": "tu_001",
        "name": "spawn_teammate",
        "input": {
          "name": "Alice",
          "role": "编码者",
          "prompt": "我是编码者Alice,后续给测试者Bob发送消息"
        }
      }
    ]
  }
]

阶段 2:执行 spawn_teammate → 追加 tool_result → history = 3 条

创建 Alice 成功,返回结果,同步更新 .team/config.json

[
  // 前2条省略
  {
    "role": "user",
    "content": [
      {
        "type": "tool_result",
        "tool_use_id": "tu_001",
        "content": "Spawned 'Alice' (role: 编码者)"
      }
    ]
  }
]

团队文件变化

.team/config.json(新增 Alice,状态 = working)

{
  "team_name": "default",
  "members": [
    {
      "name": "Alice",
      "role": "编码者",
      "status": "working"
    }
  ]
}

收件箱:.team/inbox/alice.jsonl 空文件


阶段 3:AI 调用 spawn_teammate 工具(创建 Bob)

追加助手消息 → history = 4 条

[
  // 前3条省略
  {
    "role": "assistant",
    "content": [
      {
        "type": "tool_use",
        "id": "tu_002",
        "name": "spawn_teammate",
        "input": {
          "name": "Bob",
          "role": "测试者",
          "prompt": "我是测试者Bob,接收Alice的消息"
        }
      }
    ]
  }
]

阶段 4:执行 spawn_teammate → 追加 tool_result → history = 5 条

创建 Bob 成功,返回结果,同步更新 .team/config.json

[
  // 前4条省略
  {
    "role": "user",
    "content": [
      {
        "type": "tool_result",
        "tool_use_id": "tu_002",
        "content": "Spawned 'Bob' (role: 测试者)"
      }
    ]
  }
]

团队文件变化

.team/config.json(Alice+Bob,均为 working)

{
  "team_name": "default",
  "members": [
    {
      "name": "Alice",
      "role": "编码者",
      "status": "working"
    },
    {
      "name": "Bob",
      "role": "测试者",
      "status": "working"
    }
  ]
}

收件箱:.team/inbox/bob.jsonl 空文件


阶段 5:AI 调用 send_message 工具(指挥 Alice 发消息)

追加助手消息 → history = 6 条

(Lead 给 Alice 发指令:给 Bob 发消息)

[
  // 前5条省略
  {
    "role": "assistant",
    "content": [
      {
        "type": "tool_use",
        "id": "tu_003",
        "name": "send_message",
        "input": {
          "to": "Alice",
          "content": "给Bob发送一条消息:我是Alice,已准备好编码工作",
          "msg_type": "message"
        }
      }
    ]
  }
]

阶段 6:执行 send_message → 追加 tool_result → history = 7 条

消息发送成功,写入 Alice 的收件箱

[
  // 前6条省略
  {
    "role": "user",
    "content": [
      {
        "type": "tool_result",
        "tool_use_id": "tu_003",
        "content": "Sent message to Alice"
      }
    ]
  }
]

收件箱文件变化

.team/inbox/alice.jsonl(写入 Lead 发来的指令)

{"type":"message","from":"lead","content":"给Bob发送一条消息:我是Alice,已准备好编码工作","timestamp":1718888888.888}

阶段 7:Alice 线程自动读取收件箱 → 执行 send_message 发消息给 Bob

Alice 自动执行工具,消息写入 Bob 的收件箱

收件箱文件变化

.team/inbox/bob.jsonl(Alice 发给 Bob 的消息)

{"type":"message","from":"Alice","content":"我是Alice,已准备好编码工作","timestamp":1718888889.999}

配置文件变化

Alice 完成工作,状态变为 idle

.team/config.json
{
  "team_name": "default",
  "members": [
    {
      "name": "Alice",
      "role": "编码者",
      "status": "idle"
    },
    {
      "name": "Bob",
      "role": "测试者",
      "status": "working"
    }
  ]
}

阶段 8:AI 第二轮循环 → 读取 Lead 收件箱 → 追加消息 → history = 8 条

系统自动读取收件箱,把 Alice 的回执追加到对话历史

[
  // 前7条省略
  {
    "role": "user",
    "content": "<inbox>[{\"type\":\"message\",\"from\":\"Alice\",\"content\":\"已给Bob发送消息\",\"timestamp\":1718888890.000}]</inbox>"
  }
]

阶段 9:LLM 返回最终结果 → history = 9 条

AI 给出最终答复,流程结束

[
  // 前8条省略
  {
    "role": "assistant",
    "content": "已成功生成:\n1. Alice(编码者)\n2. Bob(测试者)\nAlice 已向 Bob 发送消息:我是Alice,已准备好编码工作"
  }
]

最终完整 history JSON

[
  {
    "role": "user",
    "content": "生成Alice(编码者)和Bob(测试者)。让Alice给Bob发送一条消息"
  },
  {
    "role": "assistant",
    "content": [{"type":"tool_use","id":"tu_001","name":"spawn_teammate","input":{"name":"Alice","role":"编码者","prompt":"我是编码者Alice,后续给测试者Bob发送消息"}}]
  },
  {
    "role": "user",
    "content": [{"type":"tool_result","tool_use_id":"tu_001","content":"Spawned 'Alice' (role: 编码者)"}]
  },
  {
    "role": "assistant",
    "content": [{"type":"tool_use","id":"tu_002","name":"spawn_teammate","input":{"name":"Bob","role":"测试者","prompt":"我是测试者Bob,接收Alice的消息"}}]
  },
  {
    "role": "user",
    "content": [{"type":"tool_result","tool_use_id":"tu_002","content":"Spawned 'Bob' (role: 测试者)"}]
  },
  {
    "role": "assistant",
    "content": [{"type":"tool_use","id":"tu_003","name":"send_message","input":{"to":"Alice","content":"给Bob发送一条消息:我是Alice,已准备好编码工作","msg_type":"message"}}]
  },
  {
    "role": "user",
    "content": [{"type":"tool_result","tool_use_id":"tu_003","content":"Sent message to Alice"}]
  },
  {
    "role": "user",
    "content": "<inbox>[{\"type\":\"message\",\"from\":\"Alice\",\"content\":\"已给Bob发送消息\",\"timestamp\":1718888890.000}]</inbox>"
  },
  {
    "role": "assistant",
    "content": "已成功生成:\n1. Alice(编码者)\n2. Bob(测试者)\nAlice 已向 Bob 发送消息:我是Alice,已准备好编码工作"
  }
]

核心代码:

class MessageBus:
    def __init__(self, inbox_dir: Path):
        self.dir = inbox_dir
        self.dir.mkdir(parents=True, exist_ok=True)

    def send(self, sender: str, to: str, content: str,
             msg_type: str = "message", extra: dict = None) -> str:
        if msg_type not in VALID_MSG_TYPES:
            return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}"
        msg = {
            "type": msg_type,
            "from": sender,
            "content": content,
            "timestamp": time.time(),
        }
        if extra:
            msg.update(extra)
        inbox_path = self.dir / f"{to}.jsonl"
        with open(inbox_path, "a") as f:
            f.write(json.dumps(msg) + "\n")
        return f"Sent {msg_type} to {to}"

    def read_inbox(self, name: str) -> list:
        inbox_path = self.dir / f"{name}.jsonl"
        if not inbox_path.exists():
            return []
        messages = []
        for line in inbox_path.read_text().strip().splitlines():
            if line:
                messages.append(json.loads(line))
        inbox_path.write_text("")
        return messages

    def broadcast(self, sender: str, content: str, teammates: list) -> str:
        count = 0
        for name in teammates:
            if name != sender:
                self.send(sender, name, content, "broadcast")
                count += 1
        return f"Broadcast to {count} teammates"


BUS = MessageBus(INBOX_DIR)


# -- TeammateManager: persistent named agents with config.json --
class TeammateManager:
    def __init__(self, team_dir: Path):
        self.dir = team_dir
        self.dir.mkdir(exist_ok=True)
        self.config_path = self.dir / "config.json"
        self.config = self._load_config()
        self.threads = {}

    def _load_config(self) -> dict:
        if self.config_path.exists():
            return json.loads(self.config_path.read_text())
        return {"team_name": "default", "members": []}

    def _save_config(self):
        self.config_path.write_text(json.dumps(self.config, indent=2))

    def _find_member(self, name: str) -> dict:
        for m in self.config["members"]:
            if m["name"] == name:
                return m
        return None

    def spawn(self, name: str, role: str, prompt: str) -> str:
        member = self._find_member(name)
        if member:
            if member["status"] not in ("idle", "shutdown"):
                return f"Error: '{name}' is currently {member['status']}"
            member["status"] = "working"
            member["role"] = role
        else:
            member = {"name": name, "role": role, "status": "working"}
            self.config["members"].append(member)
        self._save_config()
        thread = threading.Thread(
            target=self._teammate_loop,
            args=(name, role, prompt),
            daemon=True,
        )
        self.threads[name] = thread
        thread.start()
        return f"Spawned '{name}' (role: {role})"

    def _teammate_loop(self, name: str, role: str, prompt: str):
        sys_prompt = (
            f"You are '{name}', role: {role}, at {WORKDIR}. "
            f"Use send_message to communicate. Complete your task."
        )
        messages = [{"role": "user", "content": prompt}]
        tools = self._teammate_tools()
        for _ in range(50):
            inbox = BUS.read_inbox(name)
            for msg in inbox:
                messages.append({"role": "user", "content": json.dumps(msg)})
            try:
                response = client.messages.create(
                    model=MODEL,
                    system=sys_prompt,
                    messages=messages,
                    tools=tools,
                    max_tokens=8000,
                )
            except Exception:
                break
            messages.append({"role": "assistant", "content": response.content})
            if response.stop_reason != "tool_use":
                break
            results = []
            for block in response.content:
                if block.type == "tool_use":
                    output = self._exec(name, block.name, block.input)
                    print(f"  [{name}] {block.name}: {str(output)[:120]}")
                    results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": str(output),
                    })
            messages.append({"role": "user", "content": results})
        member = self._find_member(name)
        if member and member["status"] != "shutdown":
            member["status"] = "idle"
            self._save_config()

    def _exec(self, sender: str, tool_name: str, args: dict) -> str:
        # these base tools are unchanged from s02
        if tool_name == "bash":
            return _run_bash(args["command"])
        if tool_name == "read_file":
            return _run_read(args["path"])
        if tool_name == "write_file":
            return _run_write(args["path"], args["content"])
        if tool_name == "edit_file":
            return _run_edit(args["path"], args["old_text"], args["new_text"])
        if tool_name == "send_message":
            return BUS.send(sender, args["to"], args["content"], args.get("msg_type", "message"))
        if tool_name == "read_inbox":
            return json.dumps(BUS.read_inbox(sender), indent=2)
        return f"Unknown tool: {tool_name}"

    def _teammate_tools(self) -> list:
        # these base tools are unchanged from s02
        return [
            {"name": "bash", "description": "Run a shell command.",
             "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
            {"name": "read_file", "description": "Read file contents.",
             "input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}},
            {"name": "write_file", "description": "Write content to file.",
             "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
            {"name": "edit_file", "description": "Replace exact text in file.",
             "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
            {"name": "send_message", "description": "Send message to a teammate.",
             "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
            {"name": "read_inbox", "description": "Read and drain your inbox.",
             "input_schema": {"type": "object", "properties": {}}},
        ]

    def list_all(self) -> str:
        if not self.config["members"]:
            return "No teammates."
        lines = [f"Team: {self.config['team_name']}"]
        for m in self.config["members"]:
            lines.append(f"  {m['name']} ({m['role']}): {m['status']}")
        return "\n".join(lines)

    def member_names(self) -> list:
        return [m["name"] for m in self.config["members"]]


TEAM = TeammateManager(TEAM_DIR)

关键代码解读

类名 比喻 核心作用 存储介质
MessageBus 团队邮局 负责 AI 之间发消息、收消息、清空消息 .team/inbox/xxx.jsonl(每个成员一个收件箱文件)
TeammateManager 团队人事经理 负责创建 AI 成员、管理成员状态、启动独立线程 .team/config.json(团队花名册)

第一部分:超细解读 MessageBus 消息邮局

核心作用

专门处理AI 成员之间的消息收发,所有消息都存在文件里(持久化,重启程序也不丢),每个成员有专属的 .jsonl 收件箱文件。


1. 初始化方法 __init__

def __init__(self, inbox_dir: Path):
    # 保存收件箱文件夹路径:.team/inbox
    self.dir = inbox_dir
    # 创建文件夹(不存在就新建,存在就跳过)
    self.dir.mkdir(parents=True, exist_ok=True)

解读

  • 一启动程序,就自动创建文件夹 .team/inbox
  • 这个文件夹里,每个 AI 成员对应一个收件箱文件:alice.jsonlbob.jsonl

2. 发消息方法 send(核心!)

def send(self, sender: str, to: str, content: str,
         msg_type: str = "message", extra: dict = None) -> str:
    # 校验消息类型是否合法(只能是预设的5种)
    if msg_type not in VALID_MSG_TYPES:
        return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}"
    # 组装消息JSON格式
    msg = {
        "type": msg_type,       # 消息类型:普通消息/广播等
        "from": sender,         # 发送者:lead/alice/bob
        "content": content,     # 消息内容
        "timestamp": time.time(),# 时间戳
    }
    # 如果有额外信息,合并到消息里
    if extra:
        msg.update(extra)
    # 找到接收者的收件箱文件:.team/inbox/【to】.jsonl
    inbox_path = self.dir / f"{to}.jsonl"
    # 以【追加模式】写入消息(不会覆盖旧消息)
    with open(inbox_path, "a") as f:
        f.write(json.dumps(msg) + "\n")
    # 返回发送成功提示
    return f"Sent {msg_type} to {to}"
实际运行:Alice 给 Bob 发消息

调用代码BUS.send("Alice", "Bob", "我是Alice,开始编码")

JSON 消息格式

{
  "type": "message",
  "from": "Alice",
  "content": "我是Alice,开始编码",
  "timestamp": 1718999999.123
}

文件变化

自动创建 / 追加到 .team/inbox/bob.jsonl

{"type":"message","from":"Alice","content":"我是Alice,开始编码","timestamp":1718999999.123}

3. 读收件箱方法 read_inbox(核心!)

def read_inbox(self, name: str) -> list:
    # 找到对应成员的收件箱文件
    inbox_path = self.dir / f"{name}.jsonl"
    # 文件不存在,返回空列表
    if not inbox_path.exists():
        return []
    messages = []
    # 读取文件所有行,逐行解析为JSON消息
    for line in inbox_path.read_text().strip().splitlines():
        if line:
            messages.append(json.loads(line))
    # 读完**清空文件**(消息只读取一次)
    inbox_path.write_text("")
    # 返回所有消息列表
    return messages
实际运行:Bob 读取自己的收件箱

调用代码BUS.read_inbox("Bob")

执行前bob.jsonl 有 1 条消息

执行中:解析消息为 JSON 列表

[
  {
    "type": "message",
    "from": "Alice",
    "content": "我是Alice,开始编码",
    "timestamp": 1718999999.123
  }
]

执行后bob.jsonl清空(内容为空)


4. 广播方法 broadcast

def broadcast(self, sender: str, content: str, teammates: list) -> str:
    count = 0
    # 遍历所有团队成员,给除了自己以外的所有人发消息
    for name in teammates:
        if name != sender:
            self.send(sender, name, content, "broadcast")
            count += 1
    return f"Broadcast to {count} teammates"

解读:群发消息,比如 Lead 给 Alice、Bob 同时发通知。


5. 全局实例

BUS = MessageBus(INBOX_DIR)

解读:创建一个全局唯一的邮局,整个程序的 AI 都用它收发消息。


第二部分:超细解读 TeammateManager 团队管理员

核心作用

管理 AI 团队的成员创建、状态、线程运行,成员信息存在 config.json(花名册),每个成员独立运行一个线程(不会互相干扰)。


1. 初始化方法 __init__

def __init__(self, team_dir: Path):
    # 团队文件夹路径:.team
    self.dir = team_dir
    self.dir.mkdir(exist_ok=True)
    # 团队配置文件:.team/config.json
    self.config_path = self.dir / "config.json"
    # 加载配置(花名册)
    self.config = self._load_config()
    # 存储成员线程:key=成员名,value=线程对象
    self.threads = {}

2. 加载 / 保存配置(花名册)

# 加载花名册
def _load_config(self) -> dict:
    if self.config_path.exists():
        return json.loads(self.config_path.read_text())
    # 默认空团队
    return {"team_name": "default", "members": []}

# 保存花名册到文件
def _save_config(self):
    self.config_path.write_text(json.dumps(self.config, indent=2))
默认初始 JSON(config.json
{
  "team_name": "default",
  "members": []
}

3. 查找成员

def _find_member(self, name: str) -> dict:
    # 遍历花名册,找到对应名字的成员
    for m in self.config["members"]:
        if m["name"] == name:
            return m
    return None

4. 创建成员 spawn(核心!)

def spawn(self, name: str, role: str, prompt: str) -> str:
    # 查找成员是否已存在
    member = self._find_member(name)
    if member:
        # 成员正在工作,不能重复创建
        if member["status"] not in ("idle", "shutdown"):
            return f"Error: '{name}' is currently {member['status']}"
        # 更新状态为工作中
        member["status"] = "working"
        member["role"] = role
    else:
        # 新建成员:名字、角色、状态=工作中
        member = {"name": name, "role": role, "status": "working"}
        self.config["members"].append(member)
    # 保存到花名册文件
    self._save_config()
    # 创建独立线程,运行成员的循环
    thread = threading.Thread(
        target=self._teammate_loop,
        args=(name, role, prompt),
        daemon=True,
    )
    self.threads[name] = thread
    thread.start()
    return f"Spawned '{name}' (role: {role})"
实际运行:创建 Alice(编码者)

调用代码TEAM.spawn("Alice", "编码者", "我是编码者")

JSON 变化(config.json

{
  "team_name": "default",
  "members": [
    {
      "name": "Alice",
      "role": "编码者",
      "status": "working"
    }
  ]
}

再创建 Bob(测试者)后

{
  "team_name": "default",
  "members": [
    { "name": "Alice", "role": "编码者", "status": "working" },
    { "name": "Bob", "role": "测试者", "status": "working" }
  ]
}

5. 成员独立循环 _teammate_loop(核心!)

def _teammate_loop(self, name: str, role: str, prompt: str):
    # 成员的系统提示:我是XX,角色XX
    sys_prompt = f"You are '{name}', role: {role}"
    # 成员的对话历史
    messages = [{"role": "user", "content": prompt}]
    tools = self._teammate_tools()
    # 最多执行50轮任务
    for _ in range(50):
        # 1. 读取自己的收件箱(自动清空)
        inbox = BUS.read_inbox(name)
        # 2. 把消息加入自己的对话历史
        for msg in inbox:
            messages.append({"role": "user", "content": json.dumps(msg)})
        # 3. 调用AI思考
        response = client.messages.create(...)
        messages.append({"role": "assistant", "content": response.content})
        # 4. 执行工具(发消息、读文件等)
        if response.stop_reason == "tool_use":
            results = []
            for block in response.content:
                if block.type == "tool_use":
                    output = self._exec(name, block.name, block.input)
                    results.append(...)
            messages.append({"role": "user", "content": results})
    # 5. 任务完成,状态改为空闲
    member = self._find_member(name)
    member["status"] = "idle"
    self._save_config()
核心逻辑

每个成员独立跑一个死循环

  1. 自动查收自己的收件箱
  2. 把消息交给 AI 思考
  3. 执行工具(比如给其他成员发消息)
  4. 完成后状态改为 idle(空闲)
JSON 变化:Alice 完成任务后

config.json 中 Alice 状态从 workingidle

{ "name": "Alice", "role": "编码者", "status": "idle" }

6. 工具执行 _exec + 成员工具列表

# 执行成员的工具
def _exec(self, sender: str, tool_name: str, args: dict) -> str:
    if tool_name == "send_message":
        # 调用邮局发消息
        return BUS.send(sender, args["to"], args["content"])
    # 其他工具:bash、读/写文件
    ...

# 成员能用的所有工具
def _teammate_tools(self) -> list:
    return [bash, read_file, write_file, send_message, read_inbox]

解读:成员可以自己发消息、读文件、执行命令,完全独立工作。


7. 查询团队信息

# 列出所有成员
def list_all(self) -> str:
    lines = [f"Team: {self.config['team_name']}"]
    for m in self.config["members"]:
        lines.append(f"  {m['name']} ({m['role']}): {m['status']}")
    return "\n".join(lines)

# 获取所有成员名字
def member_names(self) -> list:
    return [m["name"] for m in self.config["members"]]

8. 全局实例

TEAM = TeammateManager(TEAM_DIR)

解读:创建一个全局唯一的团队管理员,管理所有 AI 成员。


第三部分:两个类联动 + 完整 JSON / 文件变化

场景:创建 Alice、Bob → Alice 给 Bob 发消息

步骤 1:创建 Alice

config.json` 新增成员,状态 `working

步骤 2:创建 Bob

config.json` 新增成员,状态 `working

步骤 3:Lead 给 Alice 发指令

写入 .team/inbox/alice.jsonl

步骤 4:Alice 自动读收件箱,给 Bob 发消息

写入 .team/inbox/bob.jsonl

步骤 5:Alice 完成任务,状态改为 idle

1. 读取自己的收件箱(自动清空)

    inbox = BUS.read_inbox(name)
    # 2. 把消息加入自己的对话历史
    for msg in inbox:
        messages.append({"role": "user", "content": json.dumps(msg)})
    # 3. 调用AI思考
    response = client.messages.create(...)
    messages.append({"role": "assistant", "content": response.content})
    # 4. 执行工具(发消息、读文件等)
    if response.stop_reason == "tool_use":
        results = []
        for block in response.content:
            if block.type == "tool_use":
                output = self._exec(name, block.name, block.input)
                results.append(...)
        messages.append({"role": "user", "content": results})
# 5. 任务完成,状态改为空闲
member = self._find_member(name)
member["status"] = "idle"
self._save_config()

####  核心逻辑

每个成员**独立跑一个死循环**:

1. 自动查收自己的收件箱
2. 把消息交给 AI 思考
3. 执行工具(比如给其他成员发消息)
4. 完成后状态改为 `idle`(空闲)

####  JSON 变化:Alice 完成任务后

`config.json` 中 Alice 状态从 `working` → `idle`:

{ “name”: “Alice”, “role”: “编码者”, “status”: “idle” }


------

### 6. 工具执行 `_exec` + 成员工具列表

执行成员的工具

def _exec(self, sender: str, tool_name: str, args: dict) -> str:
if tool_name == “send_message”:
# 调用邮局发消息
return BUS.send(sender, args[“to”], args[“content”])
# 其他工具:bash、读/写文件

成员能用的所有工具

def _teammate_tools(self) -> list:
return [bash, read_file, write_file, send_message, read_inbox]


**解读**:成员可以自己发消息、读文件、执行命令,完全独立工作。

------

### 7. 查询团队信息

列出所有成员

def list_all(self) -> str:
lines = [f"Team: {self.config[‘team_name’]}“]
for m in self.config[“members”]:
lines.append(f” {m[‘name’]} ({m[‘role’]}): {m[‘status’]}")
return “\n”.join(lines)

获取所有成员名字

def member_names(self) -> list:
return [m[“name”] for m in self.config[“members”]]


------

### 8. 全局实例

TEAM = TeammateManager(TEAM_DIR)


 **解读**:创建一个**全局唯一的团队管理员**,管理所有 AI 成员。

------

# 第三部分:两个类联动 + 完整 JSON / 文件变化

## 场景:创建 Alice、Bob → Alice 给 Bob 发消息

### 步骤 1:创建 Alice

config.json新增成员,状态working


### 步骤 2:创建 Bob

config.json新增成员,状态working


### 步骤 3:Lead 给 Alice 发指令

写入 `.team/inbox/alice.jsonl`

### 步骤 4:Alice 自动读收件箱,给 Bob 发消息

写入 `.team/inbox/bob.jsonl`

### 步骤 5:Alice 完成任务,状态改为 `idle`

`config.json` 更新状态

Logo

更多推荐