FastGPT源码解析 Agent 中向量数据库PostgreSQL、Milvus 使用代码分析
FastGPT 采用多向量数据库支持的架构设计,通过统一的抽象层同时支持 PostgreSQL (pgvector) 和 Milvus 两种向量数据库,提供高性能的向量存储和检索能力。主要用于Agent的本地知识库向量化后的存储和检索。主要处理逻辑在packages/service/common/vectorStore路径中。环境变量配置2. 统一向量操作接口核心操作接口向量插入接口3. Post
·
FastGPT 向量数据库使用代码分析
向量数据库架构概览
FastGPT 采用多向量数据库支持的架构设计,通过统一的抽象层同时支持 PostgreSQL (pgvector) 和 Milvus 两种向量数据库,提供高性能的向量存储和检索能力。
主要用于Agent的本地知识库向量化后的存储和检索。主要处理逻辑在packages/service/common/vectorStore路径中。
1. 向量数据库选择策略
自动选择机制
// 文件: packages/service/common/vectorStore/controller.ts
const getVectorObj = () => {
if (PG_ADDRESS) return new PgVectorCtrl(); // 优先使用 PostgreSQL
if (MILVUS_ADDRESS) return new MilvusCtrl(); // 备选使用 Milvus
return new PgVectorCtrl(); // 默认 PostgreSQL
};
const Vector = getVectorObj();
环境变量配置
// 文件: packages/service/common/vectorStore/constants.ts
export const DatasetVectorDbName = 'fastgpt';
export const DatasetVectorTableName = 'modeldata';
export const PG_ADDRESS = process.env.PG_URL; // PostgreSQL 连接地址
export const MILVUS_ADDRESS = process.env.MILVUS_ADDRESS; // Milvus 连接地址
export const MILVUS_TOKEN = process.env.MILVUS_TOKEN; // Milvus 认证令牌
2. 统一向量操作接口
核心操作接口
// 文件: packages/service/common/vectorStore/controller.ts
export const initVectorStore = Vector.init; // 初始化向量数据库
export const deleteDatasetDataVector = Vector.delete; // 删除向量数据
export const recallFromVectorStore = Vector.embRecall; // 向量检索
export const getVectorDataByTime = Vector.getVectorDataByTime; // 按时间查询
export const getVectorCountByTeamId = Vector.getVectorCountByTeamId; // 团队向量统计
export const getVectorCountByDatasetId = Vector.getVectorCountByDatasetId; // 数据集向量统计
export const getVectorCountByCollectionId = Vector.getVectorCountByCollectionId; // 集合向量统计
向量插入接口
export const insertDatasetDataVector = async ({
model, query, ...props
}: InsertVectorProps & {
query: string;
model: EmbeddingModelItemType;
}) => {
// 1. 文本向量化
const { vectors, tokens } = await getVectorsByText({
model,
input: query,
type: 'db' // 数据库存储类型
});
// 2. 插入向量数据库
const { insertId } = await Vector.insert({
...props,
vector: vectors[0]
});
return { tokens, insertId };
};
3. PostgreSQL 向量实现
数据库初始化
// 文件: packages/service/common/vectorStore/pg/class.ts
export class PgVectorCtrl {
init = async () => {
await connectPg();
// 1. 创建向量扩展和表
await PgClient.query(`
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS ${DatasetVectorTableName} (
id BIGSERIAL PRIMARY KEY,
vector VECTOR(1536) NOT NULL, -- 1536维向量
team_id VARCHAR(50) NOT NULL, -- 团队ID
dataset_id VARCHAR(50) NOT NULL, -- 数据集ID
collection_id VARCHAR(50) NOT NULL, -- 集合ID
createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
`);
// 2. 创建 HNSW 向量索引
await PgClient.query(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS vector_index
ON ${DatasetVectorTableName}
USING hnsw (vector vector_ip_ops)
WITH (m = 32, ef_construction = 128);
`);
// 3. 创建复合索引
await PgClient.query(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS team_dataset_collection_index
ON ${DatasetVectorTableName}
USING btree(team_id, dataset_id, collection_id);
`);
// 4. 创建时间索引
await PgClient.query(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS create_time_index
ON ${DatasetVectorTableName}
USING btree(createtime);
`);
};
}
向量插入实现
insert = async (props: InsertVectorControllerProps): Promise<{ insertId: string }> => {
const { teamId, datasetId, collectionId, vector, retry = 3 } = props;
try {
const { rowCount, rows } = await PgClient.insert(DatasetVectorTableName, {
values: [[
{ key: 'vector', value: `[${vector}]` }, // 向量数据
{ key: 'team_id', value: String(teamId) }, // 团队ID
{ key: 'dataset_id', value: String(datasetId) }, // 数据集ID
{ key: 'collection_id', value: String(collectionId) } // 集合ID
]]
});
if (rowCount === 0) {
return Promise.reject('insertDatasetData: no insert');
}
return { insertId: rows[0].id };
} catch (error) {
if (retry <= 0) return Promise.reject(error);
await delay(500);
return this.insert({ ...props, retry: retry - 1 });
}
};
向量检索实现
embRecall = async (props: EmbeddingRecallCtrlProps): Promise<EmbeddingRecallResponse> => {
const {
teamId, datasetIds, vector, limit,
forbidCollectionIdList, filterCollectionIdList, retry = 2
} = props;
// 1. 构建禁用集合过滤条件
const formatForbidCollectionIdList = (() => {
if (!filterCollectionIdList) return forbidCollectionIdList;
return forbidCollectionIdList
.map(id => String(id))
.filter(id => !filterCollectionIdList.includes(id));
})();
const forbidCollectionSql = formatForbidCollectionIdList.length > 0 ?
`AND collection_id NOT IN (${formatForbidCollectionIdList.map(id => `'${id}'`).join(',')})` : '';
// 2. 构建集合过滤条件
const formatFilterCollectionId = filterCollectionIdList ?
filterCollectionIdList
.map(id => String(id))
.filter(id => !forbidCollectionIdList.includes(id)) : undefined;
const filterCollectionIdSql = formatFilterCollectionId ?
`AND collection_id IN (${formatFilterCollectionId.map(id => `'${id}'`).join(',')})` : '';
// 3. 空数据检查
if (formatFilterCollectionId && formatFilterCollectionId.length === 0) {
return { results: [] };
}
try {
// 4. 执行向量相似度检索
const results: any = await PgClient.query(`
BEGIN;
SET LOCAL hnsw.ef_search = ${global.systemEnv?.pgHNSWEfSearch || 100};
SELECT id, collection_id, vector <#> '[${vector}]' AS score
FROM ${DatasetVectorTableName}
WHERE team_id='${teamId}'
AND dataset_id IN (${datasetIds.map(id => `'${String(id)}'`).join(',')})
${filterCollectionIdSql}
${forbidCollectionSql}
ORDER BY score
LIMIT ${limit};
COMMIT;
`);
const rows = results?.[2]?.rows as PgSearchRawType[];
return {
results: rows.map(item => ({
id: String(item.id),
collectionId: item.collection_id,
score: item.score * -1 // 转换为正分数
}))
};
} catch (error) {
if (retry <= 0) return Promise.reject(error);
return this.embRecall({ ...props, retry: retry - 1 });
}
};
向量删除实现
delete = async (props: DelDatasetVectorCtrlProps): Promise<any> => {
const { teamId, retry = 2 } = props;
const teamIdWhere = `team_id='${String(teamId)}' AND`;
// 构建删除条件
const where = await (() => {
if ('id' in props && props.id)
return `${teamIdWhere} id=${props.id}`;
if ('datasetIds' in props && props.datasetIds) {
const datasetIdWhere = `dataset_id IN (${props.datasetIds
.map(id => `'${String(id)}'`).join(',')})`;
if ('collectionIds' in props && props.collectionIds) {
return `${teamIdWhere} ${datasetIdWhere} AND collection_id IN (${props.collectionIds
.map(id => `'${String(id)}'`).join(',')})`;
}
return `${teamIdWhere} ${datasetIdWhere}`;
}
if ('idList' in props && Array.isArray(props.idList)) {
if (props.idList.length === 0) return;
return `${teamIdWhere} id IN (${props.idList.map(id => String(id)).join(',')})`;
}
return Promise.reject('deleteDatasetData: no where');
})();
if (!where) return;
try {
await PgClient.delete(DatasetVectorTableName, { where: [where] });
} catch (error) {
if (retry <= 0) return Promise.reject(error);
await delay(500);
return this.delete({ ...props, retry: retry - 1 });
}
};
4. Milvus 向量实现
客户端连接
// 文件: packages/service/common/vectorStore/milvus/class.ts
export class MilvusCtrl {
getClient = async () => {
if (!MILVUS_ADDRESS) {
return Promise.reject('MILVUS_ADDRESS is not set');
}
if (global.milvusClient) return global.milvusClient;
global.milvusClient = new MilvusClient({
address: MILVUS_ADDRESS,
token: MILVUS_TOKEN
});
return global.milvusClient;
};
}
集合初始化
init = async () => {
const client = await this.getClient();
// 1. 初始化数据库
try {
const { db_names } = await client.listDatabases();
if (!db_names.includes(DatasetVectorDbName)) {
await client.createDatabase({ db_name: DatasetVectorDbName });
}
await client.useDatabase({ db_name: DatasetVectorDbName });
} catch (error) {}
// 2. 创建集合
const { value: hasCollection } = await client.hasCollection({
collection_name: DatasetVectorTableName
});
if (!hasCollection) {
await client.createCollection({
collection_name: DatasetVectorTableName,
description: 'Store dataset vector',
enableDynamicField: true,
fields: [
{
name: 'id',
data_type: DataType.Int64,
is_primary_key: true,
autoID: false
},
{
name: 'vector',
data_type: DataType.FloatVector,
dim: 1536 // 1536维向量
},
{ name: 'teamId', data_type: DataType.VarChar, max_length: 64 },
{ name: 'datasetId', data_type: DataType.VarChar, max_length: 64 },
{ name: 'collectionId', data_type: DataType.VarChar, max_length: 64 },
{ name: 'createTime', data_type: DataType.Int64 }
],
index_params: [
{
field_name: 'vector',
index_name: 'vector_HNSW',
index_type: 'HNSW',
metric_type: 'IP', // 内积相似度
params: { efConstruction: 32, M: 64 }
},
{ field_name: 'teamId', index_type: 'Trie' },
{ field_name: 'datasetId', index_type: 'Trie' },
{ field_name: 'collectionId', index_type: 'Trie' },
{ field_name: 'createTime', index_type: 'STL_SORT' }
]
});
}
// 3. 加载集合到内存
const { state: colLoadState } = await client.getLoadState({
collection_name: DatasetVectorTableName
});
if (colLoadState === LoadState.LoadStateNotExist ||
colLoadState === LoadState.LoadStateNotLoad) {
await client.loadCollectionSync({
collection_name: DatasetVectorTableName
});
}
};
Milvus 向量检索
embRecall = async (props: EmbeddingRecallCtrlProps): Promise<EmbeddingRecallResponse> => {
const client = await this.getClient();
const {
teamId, datasetIds, vector, limit,
forbidCollectionIdList, filterCollectionIdList, retry = 2
} = props;
// 1. 构建过滤条件
const formatForbidCollectionIdList = (() => {
if (!filterCollectionIdList) return forbidCollectionIdList;
return forbidCollectionIdList
.map(id => String(id))
.filter(id => !filterCollectionIdList.includes(id));
})();
const forbidColQuery = formatForbidCollectionIdList.length > 0 ?
`and (collectionId not in [${formatForbidCollectionIdList.map(id => `"${id}"`).join(',')}])` : '';
const formatFilterCollectionId = filterCollectionIdList ?
filterCollectionIdList
.map(id => String(id))
.filter(id => !forbidCollectionIdList.includes(id)) : undefined;
const collectionIdQuery = formatFilterCollectionId ?
`and (collectionId in [${formatFilterCollectionId.map(id => `"${id}"`)}])` : '';
// 2. 空数据检查
if (formatFilterCollectionId && formatFilterCollectionId.length === 0) {
return { results: [] };
}
try {
// 3. 执行向量搜索
const { results } = await client.search({
collection_name: DatasetVectorTableName,
data: vector,
limit,
filter: `(teamId == "${teamId}") and (datasetId in [${datasetIds.map(id => `"${id}"`).join(',')}]) ${collectionIdQuery} ${forbidColQuery}`,
output_fields: ['collectionId']
});
const rows = results as {
score: number;
id: string;
collectionId: string;
}[];
return {
results: rows.map(item => ({
id: String(item.id),
collectionId: item.collectionId,
score: item.score
}))
};
} catch (error) {
if (retry <= 0) return Promise.reject(error);
return this.embRecall({ ...props, retry: retry - 1 });
}
};
5. 向量数据类型定义
核心类型接口
// 文件: packages/service/common/vectorStore/type.d.ts
declare global {
var pgClient: Pool | null;
var milvusClient: MilvusClient | null;
}
export type EmbeddingRecallItemType = {
id: string; // 向量ID
collectionId: string; // 集合ID
score: number; // 相似度分数
};
操作参数类型
// 文件: packages/service/common/vectorStore/controller.d.ts
export type InsertVectorProps = {
teamId: string; // 团队ID
datasetId: string; // 数据集ID
collectionId: string; // 集合ID
};
export type EmbeddingRecallProps = {
teamId: string; // 团队ID
datasetIds: string[]; // 数据集ID列表
forbidCollectionIdList: string[]; // 禁用集合列表
filterCollectionIdList?: string[]; // 过滤集合列表
};
export type DeleteDatasetVectorProps = (
| { id: string } // 按ID删除
| { datasetIds: string[]; collectionIds?: string[] } // 按数据集/集合删除
| { idList: string[] } // 批量ID删除
) & {
teamId: string;
};
6. 向量检索优化
HNSW 参数优化
// PostgreSQL HNSW 参数
WITH (m = 32, ef_construction = 128)
SET LOCAL hnsw.ef_search = ${global.systemEnv?.pgHNSWEfSearch || 100};
// Milvus HNSW 参数
params: { efConstruction: 32, M: 64 }
相似度计算
- PostgreSQL: 使用
<#>
操作符计算内积距离,结果需要* -1
转换 - Milvus: 使用
IP
(Inner Product) 度量,直接返回相似度分数
索引策略
- 向量索引: HNSW 算法,平衡检索速度和精度
- 标量索引: 团队、数据集、集合的 B-tree/Trie 索引
- 时间索引: 支持按时间范围查询向量数据
7. 统计和监控
向量数量统计
// 按团队统计
getVectorCountByTeamId = async (teamId: string) => {
return await PgClient.count(DatasetVectorTableName, {
where: [['team_id', String(teamId)]]
});
};
// 按数据集统计
getVectorCountByDatasetId = async (teamId: string, datasetId: string) => {
return await PgClient.count(DatasetVectorTableName, {
where: [
['team_id', String(teamId)], 'and',
['dataset_id', String(datasetId)]
]
});
};
时间范围查询
getVectorDataByTime = async (start: Date, end: Date) => {
const { rows } = await PgClient.query(`
SELECT id, team_id, dataset_id
FROM ${DatasetVectorTableName}
WHERE createtime BETWEEN '${dayjs(start).format('YYYY-MM-DD HH:mm:ss')}'
AND '${dayjs(end).format('YYYY-MM-DD HH:mm:ss')}';
`);
return rows.map(item => ({
id: String(item.id),
teamId: item.team_id,
datasetId: item.dataset_id
}));
};
总结
FastGPT 向量数据库实现了多数据库支持、统一接口、高性能检索的架构:
核心特性
- 多数据库支持: PostgreSQL + Milvus 双重选择
- 统一抽象: 相同接口适配不同向量数据库
- 自动选择: 基于环境变量的智能数据库选择
- 高性能索引: HNSW 算法优化向量检索
- 灵活过滤: 支持团队、数据集、集合多级过滤
- 错误重试: 内置重试机制保证稳定性
- 统计监控: 完整的向量数据统计和监控
技术优势
- PostgreSQL: 成熟稳定,SQL 生态丰富,适合中小规模
- Milvus: 专业向量数据库,高性能,适合大规模场景
- 统一接口: 无缝切换,降低迁移成本
- 优化配置: 针对不同场景的参数优化

为武汉地区的开发者提供学习、交流和合作的平台。社区聚集了众多技术爱好者和专业人士,涵盖了多个领域,包括人工智能、大数据、云计算、区块链等。社区定期举办技术分享、培训和活动,为开发者提供更多的学习和交流机会。
更多推荐
所有评论(0)