多Agent场景,子agent 之间数据读写不同步,如何解决?
多Agent 系统里,经常会出现一个单 Agent 里从来不会出现的问题:一个子 Agent 刚写完数据,另一个子 Agent 立刻去读,结果是空的。
多Agent 系统里,经常会出现一个单 Agent 里从来不会出现的问题:一个子 Agent 刚写完数据,另一个子 Agent 立刻去读,结果是空的。
根本问题出在 Agent 的写-读模式撞上了很多数据库为单 Agent 场景设计的默认一致性配置。
接下来,这篇文章将说清楚这个矛盾从哪来,以及怎么用一行参数解决它。
01
单 Agent 与多 Agent 的读写设计有何异同?
单 Agent RAG 的工作方式是这样的:用户提出一个问题,Agent 把问题向量化,去 Milvus 检索 Top-K 文档片段,拼成 prompt 喂给模型,模型输出答案。整条链路里,向量数据库是默认只读的——数据在应用启动时、文档更新时已经写好了,推理过程中没有人再继续往里写东西。
但多 Agent 系统里有两类角色:Writer Agent负责执行任务、调用外部工具、发现新信息,把结果 embedding 后写入 Milvus 作为共享记忆;Reader Agent收到协调信号后,从 Milvus 检索最新记忆,基于这些上下文生成下一步行动。
两者是独立的进程或线程,通过消息、回调或事件协调。Writer 写完,立刻通知 Reader,这个间隔是毫秒级的。

在这种情况下,Writer 写完、信号发出、Reader 立刻查,这种模式会导致Reader的查询动作,恰好落在“数据已写入但未对Query Node可见”的时间窗口内,最终返回空结果。
那么,这个时间窗口是怎么产生的,又要如何解决?
02
Milvus如何用四档一致性控制数据对外可见的时机
出现“写后读空”的关键,在于我们对Milvus的insert()操作存在一个认知误区——insert()返回成功,不代表数据已经可以被查询。
具体来说,Milvus 的写入流程分两段,insert()操作在第一阶段完成后就会返回“成功”,但数据此时只是被写入了消息队列(类似Kafka producer ack的语义)安全落盘,但消费者(Query Node)尚未处理,此时读取自然无法看到新数据。
如图所示,这个“写入成功到数据进入Growing Segment、查询可见”的几十毫秒到几秒的时间差,就是多Agent场景下读空问题的核心诱因。

要想解决这个问题,在Milvus中,我们可以通过guarantee_timestamp来控制数据的可见性:每次search()调用都携带上这个时间戳,Query Node执行查询前会先检查自己使用的数据版本是否追上了这个时间戳?没追上就等待,追上了再执行查询。
而我们在代码中设置的consistency_level(一致性级别),本质上就是在控制guarantee_timestamp的设定逻辑。
Milvus提供四档一致性选项,可在创建Collection时设置默认值,也可在每次search()调用时单独覆盖,不同级别对应不同的可见性、性能代价,具体如下:

这里需要重点说明:Milvus创建Collection的默认一致性级别是Bounded,这对单Agent RAG场景是完全合理的——因为单Agent场景没有推理过程中的写入操作,Bounded的5秒窗口不会被触发,既能保证检索性能,又能满足需求,是性能与体验的双赢选择。
但对于Writer写完数据后Reader立即查询的多Agent事件驱动场景,此时查询的guarantee_timestamp如果仍落在Bounded的5秒窗口内,新写入的数据就会不可见,返回空结果。
而解决这个问题的关键,就是将consistency_level从默认的Bounded,切换到适配多Agent场景的strong级别。
03
实验:Bounded 查不到,Strong 查得到
为了直观验证上述结论,我们设计了一组实验:通过模拟生产环境的高写压,让Query Node始终处于数据追赶状态,再执行“写入一条数据后立即查询”的操作,对比Bounded和Strong两种一致性级别的查询结果。
实验设计思路
通过两个机制模拟生产环境的写压,确保Query Node始终处于忙碌的追赶状态:
- preload预写:提前写入大批量数据,制造WAL(Write-Ahead Log)历史积压;
- storm writers后台写入:用多个后台线程持续高速写入数据,维持Query Node的追赶压力。
每轮实验中,先写入一条带唯一标记(marker)的记录,然后立即分别用Bounded和Strong级别查询该记录——一旦出现Bounded=0、Strong=1,即判定问题复现成功。
运行前提:pymilvus >= 2.6.0 已安装,Milvus 服务可访问。
#!/usr/bin/env python3
import argparse
import itertools
import random
import threading
import time
import uuid
from contextlib import suppress
from pymilvus import DataType, MilvusClient
def make_vector(seed, dim):
rng = random.Random(seed)
vec = [rng.uniform(-1.0, 1.0) for _ in range(dim)]
norm = sum(x * x for x in vec) ** 0.5 or 1.0
return [x / norm for x in vec]
def make_records(start_id, count, dim, marker, round_no):
return [
{
"id": start_id + i,
"vector": make_vector(start_id + i, dim),
"marker": marker,
"round": round_no,
}
for i in range(count)
]
def create_collection(client, name, dim):
if client.has_collection(name):
client.drop_collection(name)
schema = client.create_schema(auto_id=False, enable_dynamic_field=False)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=dim)
schema.add_field("marker", DataType.VARCHAR, max_length=128)
schema.add_field("round", DataType.INT64)
index_params = client.prepare_index_params()
index_params.add_index(
field_name="vector",
index_type="AUTOINDEX",
metric_type="COSINE",
)
client.create_collection(
collection_name=name,
schema=schema,
index_params=index_params,
consistency_level="Bounded",
)
client.load_collection(name)
def search_marker(client, name, vector, marker, consistency, timeout):
result = client.search(
collection_name=name,
data=[vector],
anns_field="vector",
search_params={"metric_type": "COSINE"},
filter=f'marker == "{marker}"',
limit=1,
output_fields=["id", "marker", "round"],
consistency_level=consistency,
timeout=timeout,
)
hits = result[0] if result else []
return len(hits), hits
def writer_storm(uri, name, dim, stop_event, id_counter, batch_size, sleep_seconds):
client = MilvusClient(uri=uri)
while not stop_event.is_set():
start_id = next(id_counter)
records = make_records(start_id, batch_size, dim, "storm", -1)
with suppress(Exception):
client.insert(collection_name=name, data=records)
if sleep_seconds > 0:
time.sleep(sleep_seconds)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--uri", default="http://localhost:19530")
parser.add_argument("--collection", default="")
parser.add_argument("--dim", type=int, default=16)
parser.add_argument("--attempts", type=int, default=200)
parser.add_argument("--bounded-timeout", type=float, default=2.0)
parser.add_argument("--strong-timeout", type=float, default=30.0)
parser.add_argument("--storm-writers", type=int, default=2)
parser.add_argument("--storm-batch-size", type=int, default=2000)
parser.add_argument("--storm-sleep", type=float, default=0.0)
parser.add_argument("--preload", type=int, default=5000)
parser.add_argument("--keep", action="store_true")
args = parser.parse_args()
collection = args.collection or f"consistency_probe_{int(time.time())}_{uuid.uuid4().hex[:8]}"
writer = MilvusClient(uri=args.uri)
bounded_reader = MilvusClient(uri=args.uri)
strong_reader = MilvusClient(uri=args.uri)
stop_event = threading.Event()
storm_threads = []
storm_id_counter = itertools.count(10_000_000, args.storm_batch_size)
print(f"uri={args.uri}")
print(f"collection={collection}")
try:
create_collection(writer, collection, args.dim)
if args.preload > 0:
print(f"preload {args.preload} rows")
writer.insert(
collection_name=collection,
data=make_records(1_000_000, args.preload, args.dim, "preload", -2),
)
_, _ = search_marker(
strong_reader,
collection,
make_vector(1_000_000, args.dim),
"preload",
"Strong",
args.strong_timeout,
)
for _ in range(args.storm_writers):
thread = threading.Thread(
target=writer_storm,
args=(
args.uri,
collection,
args.dim,
stop_event,
storm_id_counter,
args.storm_batch_size,
args.storm_sleep,
),
daemon=True,
)
thread.start()
storm_threads.append(thread)
for attempt in range(args.attempts):
marker = f"probe_{attempt}_{uuid.uuid4().hex[:12]}"
record_id = attempt + 1
vector = make_vector(record_id, args.dim)
record = {
"id": record_id,
"vector": vector,
"marker": marker,
"round": attempt,
}
insert_start = time.perf_counter()
writer.insert(collection_name=collection, data=[record])
insert_ms = (time.perf_counter() - insert_start) * 1000
bounded_start = time.perf_counter()
bounded_count, bounded_hits = search_marker(
bounded_reader, collection, vector, marker,
"Bounded", args.bounded_timeout,
)
bounded_ms = (time.perf_counter() - bounded_start) * 1000
strong_start = time.perf_counter()
strong_count, strong_hits = search_marker(
strong_reader, collection, vector, marker,
"Strong", args.strong_timeout,
)
strong_ms = (time.perf_counter() - strong_start) * 1000
print(
f"attempt={attempt:03d} insert={insert_ms:.1f}ms "
f"bounded={bounded_count}({bounded_ms:.1f}ms) "
f"strong={strong_count}({strong_ms:.1f}ms)"
)
if bounded_count == 0 and strong_count > 0:
print("\nREPRODUCED: Bounded missed the just-inserted row, Strong found it.")
print(f"marker={marker}")
print(f"strong_hit={strong_hits[0] if strong_hits else None}")
return
if bounded_hits and not strong_hits:
print("Unexpected: Bounded found the row but Strong did not; check service config.")
print("\nNot reproduced. QueryNode likely consumed the insert before each Bounded search.")
print("Try increasing --storm-writers/--storm-batch-size/--attempts, or run against a cluster under write load.")
finally:
stop_event.set()
for thread in storm_threads:
thread.join(timeout=1)
if args.keep:
print(f"kept collection={collection}")
else:
with suppress(Exception):
writer.drop_collection(collection)
print(f"dropped collection={collection}")
if __name__ == "__main__":
main()
运行命令(替换uri为自身Milvus服务地址):
python probe.py --uri http://localhost:19530 \--storm-writers 2 \
--storm-batch-size 2000 \
--preload 5000
运行结果:(与文档中报错URL对应的服务地址一致):
uri=http://localhost:19530
collection=consistency_probe_1777278755_71fb2959
preload 5000 rows
attempt=000 insert=47.7ms bounded=0(100.7ms) strong=1(171.7ms)
REPRODUCED: Bounded missed the just-inserted row, Strong found it.
marker=probe_0_96fadc07d29e
strong_hit={'id': 1, 'distance': 1.0, 'entity': {'marker': 'probe_0_96fadc07d29e', 'round': 0, 'id': 1}}
dropped collection=consistency_probe_1777278755_71fb2959
实验结论
第一次尝试(attempt=000)即复现:bounded=0 说明 Query Node 正忙于消费 storm writers 制造的写入积压,Bounded 的 guarantee_timestamp 落在本次写入之前,新记录对此次查询不可见;strong=1 说明 Strong 强制 Query Node 追赶到全局最新时间戳后再返回,新记录被稳定查到。
其中distance=1.0确认了查询向量与写入向量完全一致,排除了向量不匹配的干扰。这进一步证明:问题的核心不是数据未写入,而是一致性级别导致的数据可见性时序冲突,与原文开头提出的多Agent写后读空问题完全吻合。
04
不是所有场景都需要 Strong
虽然consistency_level=“Strong” 能解决多 Agent 写后立刻读的问题,但它需要等待所有并发写入同步完成,会牺牲一定的性能。
因此,我们无需盲目将所有场景都设置为Strong级别,核心判断标准是:写入和查询之间是否有明确的因果关系,以及查询对数据新鲜度的要求。
结合多Agent常见场景,我们整理了针对性的一致性级别推荐方案,兼顾性能与一致性需求:
有明确因果——Writer 写完触发 Reader 查,流水线上一阶段写完触发下一阶段读,用 Strong。
无固定因果、但必须看到最新——多个 Agent 并发读写共享状态,没有固定上下游,任何人的写入都可能影响其他人的决策。用 Strong,等全局最新。

学AI大模型的正确顺序,千万不要搞错了
🤔2026年AI风口已来!各行各业的AI渗透肉眼可见,超多公司要么转型做AI相关产品,要么高薪挖AI技术人才,机遇直接摆在眼前!
有往AI方向发展,或者本身有后端编程基础的朋友,直接冲AI大模型应用开发转岗超合适!
就算暂时不打算转岗,了解大模型、RAG、Prompt、Agent这些热门概念,能上手做简单项目,也绝对是求职加分王🔋

📝给大家整理了超全最新的AI大模型应用开发学习清单和资料,手把手帮你快速入门!👇👇
学习路线:
✅大模型基础认知—大模型核心原理、发展历程、主流模型(GPT、文心一言等)特点解析
✅核心技术模块—RAG检索增强生成、Prompt工程实战、Agent智能体开发逻辑
✅开发基础能力—Python进阶、API接口调用、大模型开发框架(LangChain等)实操
✅应用场景开发—智能问答系统、企业知识库、AIGC内容生成工具、行业定制化大模型应用
✅项目落地流程—需求拆解、技术选型、模型调优、测试上线、运维迭代
✅面试求职冲刺—岗位JD解析、简历AI项目包装、高频面试题汇总、模拟面经
以上6大模块,看似清晰好上手,实则每个部分都有扎实的核心内容需要吃透!
我把大模型的学习全流程已经整理📚好了!抓住AI时代风口,轻松解锁职业新可能,希望大家都能把握机遇,实现薪资/职业跃迁~
这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费】

更多推荐



所有评论(0)