系列「企业级 AI Agent 实现拆解」E13 篇。上一篇 E12 讲了 Document 组件——怎么加载文件、切成小块。切完了之后怎么办?存起来。存在哪?向量数据库。谁来存?Indexer。这篇拆 Indexer 组件:一个接口、五种后端、一条写路径。

读完这篇你会知道

  • Indexer 接口只有一个方法,为什么这么设计
  • Indexer 和 Embedding 的分工:谁来算向量
  • 五种后端(VikingDB / Redis / Elasticsearch / Milvus / Qdrant)配置有什么区别
  • VikingDB 内置向量化和自定义向量化怎么选
  • Redis Indexer 的 DocumentToHashes 映射函数怎么写
  • 完整的知识库入库 Pipeline:Loader → Splitter → Indexer

一、先说清楚写路径

RAG 有两条路径:

  • 写路径:文件 → 切块 → 向量化 → 存储(建库)
  • 读路径:用户提问 → 向量化 → 检索(Retriever)→ 返回最相关的块

Indexer 负责写路径的最后一步:把 Document 列表存进向量数据库


二、接口极简:只有一个方法

源码在 eino/components/indexer/interface.go

type Indexer interface {
    Store(ctx context.Context, docs []*schema.Document, opts ...Option) (ids []string, err error)
}

就一个 Store()。传入 Document 列表,返回存储后的 ID 列表。

Option 结构体里有三个字段:

type Options struct {
    Index      *string            // 运行时选择索引(覆盖配置里的默认值)
    SubIndexes []string           // 子索引,实现逻辑分区
    Embedding  embedding.Embedder // 运行时替换向量化器
}

三个字段都是可选的,都能在调用时覆盖配置里的默认值。


三、Indexer 和 Embedding 怎么分工

Indexer 本身不做向量化,它通过注入一个 Embedding 实例来完成向量化:

// 配置时注入(启动时绑定)
config.Embedding = myEmbeddingInstance
indexer, _ := NewIndexer(ctx, config)

// 调用时覆盖(单次生效)
indexer.Store(ctx, docs, indexer.WithEmbedding(otherEmbedder))

这个设计的好处:Indexer 的实现代码和向量化逻辑完全解耦,你可以随时换一个 Embedding 模型,不改 Indexer 代码。

铁律:Indexer 和对应的 Retriever 必须用同一个 Embedding 模型。 写入用 text-embedding-3-small 算向量,检索用 text-embedding-ada-002 就完全错了——维度一致但语义空间不同,检索结果会乱。


四、五种后端配置

VikingDB(火山引擎)

源码:eino-ext/components/indexer/volc_vikingdb/indexer.go

VikingDB 是唯一支持内置向量化的后端——不传 Embedding 实例,让 VikingDB 平台在服务端帮你算向量:

cfg := &volc_vikingdb.IndexerConfig{
    Host:       "api-vikingdb.volces.com",
    Region:     "cn-beijing",
    AK:         ak,
    SK:         sk,
    Collection: "eino_test",
    EmbeddingConfig: volc_vikingdb.EmbeddingConfig{
        UseBuiltin: true,      // 使用平台内置向量化
        ModelName:  "bge-m3",  // 支持稠密+稀疏混合向量
        UseSparse:  true,      // 同时返回稀疏向量(用于混合检索)
    },
    AddBatchSize: 10,
}

也支持传自定义 Embedding:

EmbeddingConfig: volc_vikingdb.EmbeddingConfig{
    UseBuiltin: false,
    Embedding:  myEmbedder,  // 自定义向量化器
},

VikingDB 还支持额外字段和 TTL(其他后端都不支持):

volc_vikingdb.SetExtraDataFields(doc, map[string]interface{}{
    "department": "engineering",
})
volc_vikingdb.SetExtraDataTTL(doc, 86400)  // 24小时后自动删除

Redis

源码:eino-ext/components/indexer/redis/indexer.go

Redis 的特点是完全自定义映射——通过 DocumentToHashes 函数决定 Document 怎么存成 Redis Hash:

// 来自 eino-examples 的真实实现
config := &redis.IndexerConfig{
    Client:    redisClient,
    KeyPrefix: "eino_doc:",
    BatchSize: 1,
    Embedding: embeddingIns,

    DocumentToHashes: func(ctx context.Context, doc *schema.Document) (*redis.Hashes, error) {
        if doc.ID == "" {
            doc.ID = uuid.New().String()
        }
        metadataBytes, _ := json.Marshal(doc.MetaData)

        return &redis.Hashes{
            Key: doc.ID,
            Field2Value: map[string]redis.FieldValue{
                "content": {
                    Value:    doc.Content,
                    EmbedKey: "vector",  // 这个字段会被向量化,向量存到 "vector" key
                },
                "metadata": {
                    Value: metadataBytes,  // 不带 EmbedKey = 不向量化,直接存
                },
            },
        }, nil
    },
}

最终 Redis 里长这样:

HSET eino_doc:uuid-xxx
  content  "Go 的并发模型基于 CSP..."
  vector   <1536维二进制向量>
  metadata "{\"chapter\":\"第三章\",\"section\":\"3.1\"}"

不传 DocumentToHashes 则用默认映射:content 向量化,MetaData 所有 key 直接存。

Elasticsearch 8

源码:eino-ext/components/indexer/es8/indexer.go

结构和 Redis 类似,支持自动创建索引

config := &es8.IndexerConfig{
    Client: esClient,
    Index:  "knowledge_base",
    IndexSpec: &es8.IndexSpec{  // 不传则索引需提前手动创建
        Settings: map[string]any{
            "number_of_shards":   1,
            "number_of_replicas": 0,
        },
        Mappings: map[string]any{
            "properties": map[string]any{
                "content": map[string]any{"type": "text"},
                "vector":  map[string]any{
                    "type": "dense_vector",
                    "dims": 1536,
                },
            },
        },
    },
    Embedding: myEmbedder,
}

底层用 ES Bulk Indexer,高吞吐批量写入。

Milvus2

源码:eino-ext/components/indexer/milvus2/indexer.go

功能最全,支持稀疏+稠密混合向量、BM25 内置函数、动态 Schema:

cfg := &milvus2.IndexerConfig{
    ClientConfig: &milvusclient.ClientConfig{Address: "localhost:19530"},
    Collection:   "eino_collection",
    Vector: &milvus2.VectorConfig{
        Dimension:   1536,
        MetricType:  milvus2.MetricTypeCosine,
        VectorField: "vector",
    },
    Embedding:           myEmbedder,
    EnableDynamicSchema: true,  // 动态字段,无需预定义 Schema
}

Qdrant

配置最简洁:

cfg := &qdrant.Config{
    Client:     qdrantClient,
    Collection: "eino_collection",
    VectorDim:  1536,
    Distance:   qdrant.Distance_Cosine,
    Embedding:  myEmbedder,
}

五、五种后端一眼看差别

VikingDB Redis ES8 Milvus2 Qdrant
内置向量化 ✓(bge-m3等)
稀疏向量
TTL
自动建索引
自定义映射函数
国产云 火山引擎

六、完整 Pipeline:从文件到向量库

以 eino-examples 里的知识库入库 Pipeline 为例:

// 源码:eino-examples/quickstart/eino_assistant/eino/knowledgeindexing/orchestration.go
func BuildKnowledgeIndexing(ctx context.Context) (compose.Runnable[document.Source, []string], error) {
    g := compose.NewGraph[document.Source, []string]()

    // 节点 1:读文件
    fileLoader, _ := file.NewFileLoader(ctx, &file.FileLoaderConfig{})
    g.AddLoaderNode("Loader", fileLoader)

    // 节点 2:按 Markdown 标题切片
    splitter, _ := markdown.NewHeaderSplitter(ctx, &markdown.HeaderConfig{
        Headers: map[string]string{"#": "title", "##": "section"},
    })
    g.AddDocumentTransformerNode("Splitter", splitter)

    // 节点 3:向量化 + 存入 Redis
    redisIndexer, _ := redis.NewIndexer(ctx, &redis.IndexerConfig{
        Client:    redisClient,
        KeyPrefix: "kb:",
        Embedding: myEmbedder,
    })
    g.AddIndexerNode("Indexer", redisIndexer)

    // 连线
    g.AddEdge(compose.START, "Loader")
    g.AddEdge("Loader", "Splitter")
    g.AddEdge("Splitter", "Indexer")
    g.AddEdge("Indexer", compose.END)

    return g.Compile(ctx, compose.WithGraphName("KnowledgeIndexing"))
}

批量入库:

pipeline, _ := BuildKnowledgeIndexing(ctx)

filepath.WalkDir(docsDir, func(path string, d fs.DirEntry, err error) error {
    if strings.HasSuffix(path, ".md") {
        ids, _ := pipeline.Invoke(ctx, document.Source{URI: path})
        fmt.Printf("入库 %d 块:%s\n", len(ids), path)
    }
    return nil
})

整条链路:一个 Markdown 文件进去,N 个向量 ID 出来。


七、加 Callback 监控入库

handler := callbacksHelper.NewHandlerHelper().
    Indexer(&callbacksHelper.IndexerCallbackHandler{
        OnStart: func(ctx context.Context, info *callbacks.RunInfo, input *indexer.CallbackInput) context.Context {
            log.Printf("[Indexer] 开始存入 %d 个块", len(input.Docs))
            return ctx
        },
        OnEnd: func(ctx context.Context, info *callbacks.RunInfo, output *indexer.CallbackOutput) context.Context {
            log.Printf("[Indexer] 已存入,IDs: %v", output.IDs)
            return ctx
        },
    }).
    Handler()

ids, _ := pipeline.Invoke(ctx, src, compose.WithCallbacks(handler))

小结

切好的 [Document, Document, ...]
    ↓ Indexer.Store()
    ├── 调 Embedding.EmbedStrings() 向量化 Content
    ├── 批量写入后端(Redis/ES/Milvus 等)
    └── 返回 [ID1, ID2, ...]

选哪个后端?

场景 推荐
国内云,要混合检索(稠密+稀疏) VikingDB
已有 Redis,快速上线 Redis Indexer
已有 ES,需要全文+向量混搜 ES8
需要最灵活的向量索引配置 Milvus2
轻量自建,配置简单 Qdrant

Indexer 只是写路径的终点。存进去之后,读路径由 Retriever 负责——那是下一篇的主题。

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐