FastGPT 向量数据库使用代码分析

向量数据库架构概览

FastGPT 采用多向量数据库支持的架构设计,通过统一的抽象层同时支持 PostgreSQL (pgvector) 和 Milvus 两种向量数据库,提供高性能的向量存储和检索能力。

主要用于Agent的本地知识库向量化后的存储和检索。主要处理逻辑在packages/service/common/vectorStore路径中。

1. 向量数据库选择策略

自动选择机制

// 文件: packages/service/common/vectorStore/controller.ts
const getVectorObj = () => {
  if (PG_ADDRESS) return new PgVectorCtrl();      // 优先使用 PostgreSQL
  if (MILVUS_ADDRESS) return new MilvusCtrl();    // 备选使用 Milvus
  
  return new PgVectorCtrl();                      // 默认 PostgreSQL
};

const Vector = getVectorObj();

环境变量配置

// 文件: packages/service/common/vectorStore/constants.ts
export const DatasetVectorDbName = 'fastgpt';
export const DatasetVectorTableName = 'modeldata';

export const PG_ADDRESS = process.env.PG_URL;           // PostgreSQL 连接地址
export const MILVUS_ADDRESS = process.env.MILVUS_ADDRESS; // Milvus 连接地址
export const MILVUS_TOKEN = process.env.MILVUS_TOKEN;     // Milvus 认证令牌

2. 统一向量操作接口

核心操作接口

// 文件: packages/service/common/vectorStore/controller.ts
export const initVectorStore = Vector.init;                    // 初始化向量数据库
export const deleteDatasetDataVector = Vector.delete;          // 删除向量数据
export const recallFromVectorStore = Vector.embRecall;         // 向量检索
export const getVectorDataByTime = Vector.getVectorDataByTime; // 按时间查询
export const getVectorCountByTeamId = Vector.getVectorCountByTeamId;       // 团队向量统计
export const getVectorCountByDatasetId = Vector.getVectorCountByDatasetId; // 数据集向量统计
export const getVectorCountByCollectionId = Vector.getVectorCountByCollectionId; // 集合向量统计

向量插入接口

export const insertDatasetDataVector = async ({
  model, query, ...props
}: InsertVectorProps & {
  query: string;
  model: EmbeddingModelItemType;
}) => {
  // 1. 文本向量化
  const { vectors, tokens } = await getVectorsByText({
    model,
    input: query,
    type: 'db'  // 数据库存储类型
  });
  
  // 2. 插入向量数据库
  const { insertId } = await Vector.insert({
    ...props,
    vector: vectors[0]
  });
  
  return { tokens, insertId };
};

3. PostgreSQL 向量实现

数据库初始化

// 文件: packages/service/common/vectorStore/pg/class.ts
export class PgVectorCtrl {
  init = async () => {
    await connectPg();
    
    // 1. 创建向量扩展和表
    await PgClient.query(`
      CREATE EXTENSION IF NOT EXISTS vector;
      CREATE TABLE IF NOT EXISTS ${DatasetVectorTableName} (
          id BIGSERIAL PRIMARY KEY,
          vector VECTOR(1536) NOT NULL,           -- 1536维向量
          team_id VARCHAR(50) NOT NULL,           -- 团队ID
          dataset_id VARCHAR(50) NOT NULL,        -- 数据集ID
          collection_id VARCHAR(50) NOT NULL,     -- 集合ID
          createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP
      );
    `);
    
    // 2. 创建 HNSW 向量索引
    await PgClient.query(`
      CREATE INDEX CONCURRENTLY IF NOT EXISTS vector_index 
      ON ${DatasetVectorTableName} 
      USING hnsw (vector vector_ip_ops) 
      WITH (m = 32, ef_construction = 128);
    `);
    
    // 3. 创建复合索引
    await PgClient.query(`
      CREATE INDEX CONCURRENTLY IF NOT EXISTS team_dataset_collection_index 
      ON ${DatasetVectorTableName} 
      USING btree(team_id, dataset_id, collection_id);
    `);
    
    // 4. 创建时间索引
    await PgClient.query(`
      CREATE INDEX CONCURRENTLY IF NOT EXISTS create_time_index 
      ON ${DatasetVectorTableName} 
      USING btree(createtime);
    `);
  };
}

向量插入实现

insert = async (props: InsertVectorControllerProps): Promise<{ insertId: string }> => {
  const { teamId, datasetId, collectionId, vector, retry = 3 } = props;
  
  try {
    const { rowCount, rows } = await PgClient.insert(DatasetVectorTableName, {
      values: [[
        { key: 'vector', value: `[${vector}]` },           // 向量数据
        { key: 'team_id', value: String(teamId) },         // 团队ID
        { key: 'dataset_id', value: String(datasetId) },   // 数据集ID
        { key: 'collection_id', value: String(collectionId) } // 集合ID
      ]]
    });
    
    if (rowCount === 0) {
      return Promise.reject('insertDatasetData: no insert');
    }
    
    return { insertId: rows[0].id };
  } catch (error) {
    if (retry <= 0) return Promise.reject(error);
    
    await delay(500);
    return this.insert({ ...props, retry: retry - 1 });
  }
};

向量检索实现

embRecall = async (props: EmbeddingRecallCtrlProps): Promise<EmbeddingRecallResponse> => {
  const {
    teamId, datasetIds, vector, limit,
    forbidCollectionIdList, filterCollectionIdList, retry = 2
  } = props;
  
  // 1. 构建禁用集合过滤条件
  const formatForbidCollectionIdList = (() => {
    if (!filterCollectionIdList) return forbidCollectionIdList;
    return forbidCollectionIdList
      .map(id => String(id))
      .filter(id => !filterCollectionIdList.includes(id));
  })();
  
  const forbidCollectionSql = formatForbidCollectionIdList.length > 0 ?
    `AND collection_id NOT IN (${formatForbidCollectionIdList.map(id => `'${id}'`).join(',')})` : '';
  
  // 2. 构建集合过滤条件
  const formatFilterCollectionId = filterCollectionIdList ?
    filterCollectionIdList
      .map(id => String(id))
      .filter(id => !forbidCollectionIdList.includes(id)) : undefined;
  
  const filterCollectionIdSql = formatFilterCollectionId ?
    `AND collection_id IN (${formatFilterCollectionId.map(id => `'${id}'`).join(',')})` : '';
  
  // 3. 空数据检查
  if (formatFilterCollectionId && formatFilterCollectionId.length === 0) {
    return { results: [] };
  }
  
  try {
    // 4. 执行向量相似度检索
    const results: any = await PgClient.query(`
      BEGIN;
        SET LOCAL hnsw.ef_search = ${global.systemEnv?.pgHNSWEfSearch || 100};
        SELECT id, collection_id, vector <#> '[${vector}]' AS score 
        FROM ${DatasetVectorTableName} 
        WHERE team_id='${teamId}'
          AND dataset_id IN (${datasetIds.map(id => `'${String(id)}'`).join(',')})
          ${filterCollectionIdSql}
          ${forbidCollectionSql}
        ORDER BY score 
        LIMIT ${limit};
      COMMIT;
    `);
    
    const rows = results?.[2]?.rows as PgSearchRawType[];
    
    return {
      results: rows.map(item => ({
        id: String(item.id),
        collectionId: item.collection_id,
        score: item.score * -1  // 转换为正分数
      }))
    };
  } catch (error) {
    if (retry <= 0) return Promise.reject(error);
    return this.embRecall({ ...props, retry: retry - 1 });
  }
};

向量删除实现

delete = async (props: DelDatasetVectorCtrlProps): Promise<any> => {
  const { teamId, retry = 2 } = props;
  
  const teamIdWhere = `team_id='${String(teamId)}' AND`;
  
  // 构建删除条件
  const where = await (() => {
    if ('id' in props && props.id) 
      return `${teamIdWhere} id=${props.id}`;
    
    if ('datasetIds' in props && props.datasetIds) {
      const datasetIdWhere = `dataset_id IN (${props.datasetIds
        .map(id => `'${String(id)}'`).join(',')})`;
      
      if ('collectionIds' in props && props.collectionIds) {
        return `${teamIdWhere} ${datasetIdWhere} AND collection_id IN (${props.collectionIds
          .map(id => `'${String(id)}'`).join(',')})`;
      }
      
      return `${teamIdWhere} ${datasetIdWhere}`;
    }
    
    if ('idList' in props && Array.isArray(props.idList)) {
      if (props.idList.length === 0) return;
      return `${teamIdWhere} id IN (${props.idList.map(id => String(id)).join(',')})`;
    }
    
    return Promise.reject('deleteDatasetData: no where');
  })();
  
  if (!where) return;
  
  try {
    await PgClient.delete(DatasetVectorTableName, { where: [where] });
  } catch (error) {
    if (retry <= 0) return Promise.reject(error);
    await delay(500);
    return this.delete({ ...props, retry: retry - 1 });
  }
};

4. Milvus 向量实现

客户端连接

// 文件: packages/service/common/vectorStore/milvus/class.ts
export class MilvusCtrl {
  getClient = async () => {
    if (!MILVUS_ADDRESS) {
      return Promise.reject('MILVUS_ADDRESS is not set');
    }
    if (global.milvusClient) return global.milvusClient;
    
    global.milvusClient = new MilvusClient({
      address: MILVUS_ADDRESS,
      token: MILVUS_TOKEN
    });
    
    return global.milvusClient;
  };
}

集合初始化

init = async () => {
  const client = await this.getClient();
  
  // 1. 初始化数据库
  try {
    const { db_names } = await client.listDatabases();
    
    if (!db_names.includes(DatasetVectorDbName)) {
      await client.createDatabase({ db_name: DatasetVectorDbName });
    }
    
    await client.useDatabase({ db_name: DatasetVectorDbName });
  } catch (error) {}
  
  // 2. 创建集合
  const { value: hasCollection } = await client.hasCollection({
    collection_name: DatasetVectorTableName
  });
  
  if (!hasCollection) {
    await client.createCollection({
      collection_name: DatasetVectorTableName,
      description: 'Store dataset vector',
      enableDynamicField: true,
      fields: [
        {
          name: 'id',
          data_type: DataType.Int64,
          is_primary_key: true,
          autoID: false
        },
        {
          name: 'vector',
          data_type: DataType.FloatVector,
          dim: 1536  // 1536维向量
        },
        { name: 'teamId', data_type: DataType.VarChar, max_length: 64 },
        { name: 'datasetId', data_type: DataType.VarChar, max_length: 64 },
        { name: 'collectionId', data_type: DataType.VarChar, max_length: 64 },
        { name: 'createTime', data_type: DataType.Int64 }
      ],
      index_params: [
        {
          field_name: 'vector',
          index_name: 'vector_HNSW',
          index_type: 'HNSW',
          metric_type: 'IP',  // 内积相似度
          params: { efConstruction: 32, M: 64 }
        },
        { field_name: 'teamId', index_type: 'Trie' },
        { field_name: 'datasetId', index_type: 'Trie' },
        { field_name: 'collectionId', index_type: 'Trie' },
        { field_name: 'createTime', index_type: 'STL_SORT' }
      ]
    });
  }
  
  // 3. 加载集合到内存
  const { state: colLoadState } = await client.getLoadState({
    collection_name: DatasetVectorTableName
  });
  
  if (colLoadState === LoadState.LoadStateNotExist || 
      colLoadState === LoadState.LoadStateNotLoad) {
    await client.loadCollectionSync({
      collection_name: DatasetVectorTableName
    });
  }
};

Milvus 向量检索

embRecall = async (props: EmbeddingRecallCtrlProps): Promise<EmbeddingRecallResponse> => {
  const client = await this.getClient();
  const {
    teamId, datasetIds, vector, limit,
    forbidCollectionIdList, filterCollectionIdList, retry = 2
  } = props;
  
  // 1. 构建过滤条件
  const formatForbidCollectionIdList = (() => {
    if (!filterCollectionIdList) return forbidCollectionIdList;
    return forbidCollectionIdList
      .map(id => String(id))
      .filter(id => !filterCollectionIdList.includes(id));
  })();
  
  const forbidColQuery = formatForbidCollectionIdList.length > 0 ?
    `and (collectionId not in [${formatForbidCollectionIdList.map(id => `"${id}"`).join(',')}])` : '';
  
  const formatFilterCollectionId = filterCollectionIdList ?
    filterCollectionIdList
      .map(id => String(id))
      .filter(id => !forbidCollectionIdList.includes(id)) : undefined;
  
  const collectionIdQuery = formatFilterCollectionId ?
    `and (collectionId in [${formatFilterCollectionId.map(id => `"${id}"`)}])` : '';
  
  // 2. 空数据检查
  if (formatFilterCollectionId && formatFilterCollectionId.length === 0) {
    return { results: [] };
  }
  
  try {
    // 3. 执行向量搜索
    const { results } = await client.search({
      collection_name: DatasetVectorTableName,
      data: vector,
      limit,
      filter: `(teamId == "${teamId}") and (datasetId in [${datasetIds.map(id => `"${id}"`).join(',')}]) ${collectionIdQuery} ${forbidColQuery}`,
      output_fields: ['collectionId']
    });
    
    const rows = results as {
      score: number;
      id: string;
      collectionId: string;
    }[];
    
    return {
      results: rows.map(item => ({
        id: String(item.id),
        collectionId: item.collectionId,
        score: item.score
      }))
    };
  } catch (error) {
    if (retry <= 0) return Promise.reject(error);
    return this.embRecall({ ...props, retry: retry - 1 });
  }
};

5. 向量数据类型定义

核心类型接口

// 文件: packages/service/common/vectorStore/type.d.ts
declare global {
  var pgClient: Pool | null;
  var milvusClient: MilvusClient | null;
}

export type EmbeddingRecallItemType = {
  id: string;           // 向量ID
  collectionId: string; // 集合ID
  score: number;        // 相似度分数
};

操作参数类型

// 文件: packages/service/common/vectorStore/controller.d.ts
export type InsertVectorProps = {
  teamId: string;       // 团队ID
  datasetId: string;    // 数据集ID
  collectionId: string; // 集合ID
};

export type EmbeddingRecallProps = {
  teamId: string;                    // 团队ID
  datasetIds: string[];              // 数据集ID列表
  forbidCollectionIdList: string[];  // 禁用集合列表
  filterCollectionIdList?: string[]; // 过滤集合列表
};

export type DeleteDatasetVectorProps = (
  | { id: string }                                    // 按ID删除
  | { datasetIds: string[]; collectionIds?: string[] } // 按数据集/集合删除
  | { idList: string[] }                              // 批量ID删除
) & {
  teamId: string;
};

6. 向量检索优化

HNSW 参数优化

// PostgreSQL HNSW 参数
WITH (m = 32, ef_construction = 128)
SET LOCAL hnsw.ef_search = ${global.systemEnv?.pgHNSWEfSearch || 100};

// Milvus HNSW 参数
params: { efConstruction: 32, M: 64 }

相似度计算

  • PostgreSQL: 使用 <#> 操作符计算内积距离,结果需要 * -1 转换
  • Milvus: 使用 IP (Inner Product) 度量,直接返回相似度分数

索引策略

  • 向量索引: HNSW 算法,平衡检索速度和精度
  • 标量索引: 团队、数据集、集合的 B-tree/Trie 索引
  • 时间索引: 支持按时间范围查询向量数据

7. 统计和监控

向量数量统计

// 按团队统计
getVectorCountByTeamId = async (teamId: string) => {
  return await PgClient.count(DatasetVectorTableName, {
    where: [['team_id', String(teamId)]]
  });
};

// 按数据集统计
getVectorCountByDatasetId = async (teamId: string, datasetId: string) => {
  return await PgClient.count(DatasetVectorTableName, {
    where: [
      ['team_id', String(teamId)], 'and', 
      ['dataset_id', String(datasetId)]
    ]
  });
};

时间范围查询

getVectorDataByTime = async (start: Date, end: Date) => {
  const { rows } = await PgClient.query(`
    SELECT id, team_id, dataset_id
    FROM ${DatasetVectorTableName}
    WHERE createtime BETWEEN '${dayjs(start).format('YYYY-MM-DD HH:mm:ss')}' 
      AND '${dayjs(end).format('YYYY-MM-DD HH:mm:ss')}';
  `);
  
  return rows.map(item => ({
    id: String(item.id),
    teamId: item.team_id,
    datasetId: item.dataset_id
  }));
};

总结

FastGPT 向量数据库实现了多数据库支持、统一接口、高性能检索的架构:

核心特性

  1. 多数据库支持: PostgreSQL + Milvus 双重选择
  2. 统一抽象: 相同接口适配不同向量数据库
  3. 自动选择: 基于环境变量的智能数据库选择
  4. 高性能索引: HNSW 算法优化向量检索
  5. 灵活过滤: 支持团队、数据集、集合多级过滤
  6. 错误重试: 内置重试机制保证稳定性
  7. 统计监控: 完整的向量数据统计和监控

技术优势

  • PostgreSQL: 成熟稳定,SQL 生态丰富,适合中小规模
  • Milvus: 专业向量数据库,高性能,适合大规模场景
  • 统一接口: 无缝切换,降低迁移成本
  • 优化配置: 针对不同场景的参数优化
Logo

为武汉地区的开发者提供学习、交流和合作的平台。社区聚集了众多技术爱好者和专业人士,涵盖了多个领域,包括人工智能、大数据、云计算、区块链等。社区定期举办技术分享、培训和活动,为开发者提供更多的学习和交流机会。

更多推荐