learn claude code S09 Agent 团队详解笔记
基于源码逐行分析,配合设计思路。
S09 Agent 团队详解笔记
基于
s09_agent_teams.py源码逐行分析,配合s09-agent-teams.md设计思路。
一、问题:子 agent 用完就没了
s04 的子 agent 是一个"一次性工具":模型调 task → 子 agent 启动 → 干完活 → 返回摘要 → 销毁。每次需要子 agent 就得重新 spawn,之前的上下文全部丢失。
这在复杂项目中是不够的。想象一个团队:Alice 负责后端 API,Bob 负责前端组件,Lead 负责协调。他们需要:
- 持久存在:Alice 一直在后台,不是每次都要重新创建
- 互相通信:Bob 写好了组件,通知 Alice “接口可以对接了”
- 独立工作:每个人跑自己的循环,互不阻塞
s09 把这个"一次性工具"升级为"持久团队"。
二、解决方案:文件收件箱 + 线程化队友
s09 的核心是两个组件:
MessageBus:每个 teammate 在 .team/inbox/ 下有一个 JSONL 文件(如 alice.jsonl)。发送消息 = 往文件尾部追加一行 JSON。读取消息 = 读取全部行然后清空文件。
TeammateManager:每个 teammate 在独立线程中运行自己的 agent_loop。从 inbox 读消息 → 调用 API → 执行工具 → 可能发消息给其他 teammate → 循环。
Subagent (s04): spawn → execute → return summary → destroyed
Teammate (s09): spawn → work → idle → work → ... → shutdown
三、和 s08 相比,多了什么?
| 组件 | s08 | s09 |
|---|---|---|
| 并发模型 | 后台线程(跑命令) | 后台线程(跑完整 agent_loop) |
| 通信 | 通知队列 | JSONL inbox(文件持久化) |
| 生命周期 | 一次性(命令完成即结束) | 持久(idle/working/shutdown) |
| 新工具 | background_run, check_bg |
spawn_teammate, send_message, read_inbox, broadcast |
四、MessageBus:基于文件的收件箱
4.1 send() — 追加一行 JSON
def send(self, sender: str, to: str, content: str,
msg_type: str = "message", extra: dict = None) -> str:
msg = {
"type": msg_type,
"from": sender,
"content": content,
"timestamp": time.time(),
}
if extra:
msg.update(extra)
inbox_path = self.dir / f"{to}.jsonl"
with open(inbox_path, "a") as f: # "a" = append,追加模式
f.write(json.dumps(msg) + "\n")
return f"Sent {msg_type} to {to}"
每条消息一行 JSON。文件操作 open(path, "a") 是追加模式——不会覆盖已有内容。多个线程同时往同一个 inbox 追加消息是相对安全的(操作系统级的 O_APPEND 是原子的,只要写的行不太长)。
4.2 read_inbox() — 读取并清空
def read_inbox(self, name: str) -> list:
inbox_path = self.dir / f"{name}.jsonl"
if not inbox_path.exists():
return []
messages = []
for line in inbox_path.read_text().strip().splitlines():
if line:
messages.append(json.loads(line))
inbox_path.write_text("") # 读完立刻清空
return messages
读完即清空——每条消息只投递一次。这和 s08 的 drain_notifications 是同一模式。不会出现模型反复看到同一条消息的问题。
4.3 broadcast() — 群发
def broadcast(self, sender: str, content: str, teammates: list) -> str:
count = 0
for name in teammates:
if name != sender: # 不给自己发
self.send(sender, name, content, "broadcast")
count += 1
return f"Broadcast to {count} teammates"
遍历所有 teammate,除了发送者自己。广播用于"全体通知"——如 lead 说"需求变更了,所有人停下手头工作"。
五、TeammateManager:队友的生命周期管理
5.1 配置文件:.team/config.json
{
"team_name": "default",
"members": [
{"name": "alice", "role": "coder", "status": "working"},
{"name": "bob", "role": "tester", "status": "idle"}
]
}
配置的持久化和 s07 的 TaskManager 一样:JSON 文件,存在 .team/ 下,压缩和重启后不丢失。
5.2 spawn() — 创建队友线程
def spawn(self, name: str, role: str, prompt: str) -> str:
member = self._find_member(name)
if member:
# 已存在 → 激活它(如果状态允许)
if member["status"] not in ("idle", "shutdown"):
return f"Error: '{name}' is currently {member['status']}"
member["status"] = "working"
else:
# 不存在 → 创建新成员
member = {"name": name, "role": role, "status": "working"}
self.config["members"].append(member)
self._save_config()
thread = threading.Thread(
target=self._teammate_loop,
args=(name, role, prompt),
daemon=True,
)
self.threads[name] = thread
thread.start()
return f"Spawned '{name}' (role: {role})"
已存在的 teammate 可以被"重新激活"——这和 s04 的子 agent 每次都从头创建完全不同。一个 teammate 完成一个任务后进入 idle,lead 可以给它新的 prompt,它带着之前的上下文继续工作。
5.3 _teammate_loop() — 队友自己的 agent 循环
每个 teammate 在独立线程中运行这个函数:
def _teammate_loop(self, name: str, role: str, prompt: str):
sys_prompt = (
f"You are '{name}', role: {role}, at {WORKDIR}. "
f"Use send_message to communicate. Complete your task."
)
messages = [{"role": "user", "content": prompt}]
tools = self._teammate_tools()
for _ in range(50): # 最多 50 轮
# 1. 读 inbox
inbox = BUS.read_inbox(name)
for msg in inbox:
messages.append({"role": "user", "content": json.dumps(msg)})
# 2. 调用 API
response = client.messages.create(...)
# 3. 记录回复
messages.append({"role": "assistant", "content": response.content})
# 4. 检查 stop_reason
if response.stop_reason != "tool_use":
break
# 5. 执行工具(包括 send_message 发消息给其他队友)
results = []
for block in response.content:
if block.type == "tool_use":
output = self._exec(name, block.name, block.input)
results.append({"type": "tool_result", ...})
messages.append({"role": "user", "content": results})
# 完成 → 状态切为 idle
member = self._find_member(name)
if member and member["status"] != "shutdown":
member["status"] = "idle"
这和 s01 的 agent_loop 是同一个结构——while 循环 + API 调用 + 工具执行。但有两个关键差异:
- 每轮先读 inbox:其他 teammate 发来的消息在下一轮开始前被消费
- 最多 50 轮:防止队友无限循环,50 轮后自然结束进入 idle
5.4 队友的工具集
队友有 6 个工具:bash、read_file、write_file、edit_file(基础工具)+ send_message、read_inbox(通信工具)。注意队友没有 spawn_teammate——防止递归创建无限嵌套的代理。这和 s04 的子 agent 不能调 task 是同一原则。
5.5 Lead 的工具集
Lead(主 agent)拥有 9 个工具:4 个基础 + spawn_teammate、list_teammates、send_message、read_inbox、broadcast。Lead 是团队的协调者——创建队友、发送指令、群发通知。
六、agent_loop 的改造
Lead 的循环和 teammate 的循环结构相同,唯一差异是每轮 API 调用前也读 inbox:
def agent_loop(messages: list):
while True:
inbox = BUS.read_inbox("lead")
if inbox:
messages.append({
"role": "user",
"content": f"<inbox>{json.dumps(inbox, indent=2)}</inbox>",
})
# ...后续流程不变...
lead 的 inbox 文件名是 lead.jsonl。teammate 发给 lead 的消息(如"任务完成了"、“遇到问题需要帮助”)出现在这里。
七、完整流程走读
用户:“Alice 写 API,Bob 写测试,两人协作。”
第 1 轮
Lead 调用 spawn_teammate("alice", "backend", "实现 /api/users 接口")。Alice 的线程启动,开始在自己的循环中工作。
第 2 轮
Lead 调用 spawn_teammate("bob", "tester", "为 /api/users 写测试")。Bob 的线程启动。
第 3-10 轮
Alice 在后台读代码、写代码、跑测试。Alice 调用 send_message("bob", "接口初步完成,路由是 /api/users,返回 JSON 格式见下方")。这条消息被追加到 bob.jsonl。
Bob 在下一轮循环开始时,read_inbox("bob") 读到 Alice 的消息,开始基于接口规范写测试。
第 11 轮
Lead 查 inbox:read_inbox("lead")。Alice 发来 “任务完成”。Lead 的 agent_loop 在消息中注入 inbox 内容。
八、设计洞察
8.1 文件即消息队列
MessageBus 没有用 Redis、RabbitMQ、甚至 Python 的 queue.Queue。就是 JSONL 文件 + append + read-and-clear。为什么?
- 零依赖:不需要安装任何消息中间件
- 持久化:程序崩溃后消息还在磁盘上
- 可调试:
cat .team/inbox/alice.jsonl看所有消息 - 跨进程:文件可以被不同进程读写(为 s12 的 worktree 隔离做准备)
当并发量小时,文件系统是最简单的消息队列。
8.2 s04 vs s09:一次性 vs 持续性
| s04 子 agent | s09 队友 | |
|---|---|---|
| 生命周期 | 一次任务 → 销毁 | spawn → work → idle → work… |
| 上下文 | 每次新建 | 保留(可以跨任务积累经验) |
| 通信 | 无 | inbox + send_message |
| 命名 | 无 | 有名字、有角色、有状态 |
s09 的队友是"可以合作的人",s04 的子 agent 是"可以调用的函数"。
8.3 读后即焚 vs 读后保留
read_inbox 读完清空文件——这是刻意设计。如果不清空,队友每轮循环都会重新处理旧消息。而旧消息已经被处理过(执行了工具、发回了回复),重复处理会导致混乱。"读后即焚"保证每条消息只被消费一次,和消息队列的 at-most-once 语义一致。
更多推荐



所有评论(0)