写后台系统的时候,有一种性能问题特别隐蔽:列表页只是多显示几个名称,数据库查询次数却突然翻倍。

比如订单列表。

订单表里有 customer_uuid,页面上要展示客户名;订单还可能来自服务请求,页面上又要展示服务项目名。需求看起来很普通,于是很容易写出这种代码:

// 伪代码:别这么写
let orders = OrderEntity::find()
    .offset(...)
    .limit(...)
    .all(db)
    .await?;

for order in orders {
    let customer = ContactEntity::find()
        .filter(ContactColumn::ContactUuid.eq(order.customer_uuid))
        .one(db)
        .await?;

    let request = ServiceRequestEntity::find()
        .filter(ServiceRequestColumn::Uuid.eq(order.request_id))
        .one(db)
        .await?;

    // 再根据 request.service_catalog_uuid 查服务项目...
}

这段代码最大的问题不是“慢”,而是慢得很稳定地不可控

分页 20 条时,看起来还能忍;分页 100 条、字段再多几个、网络抖一下,列表页就开始卡。更麻烦的是,这类问题经常在开发环境看不出来,因为本地数据太少、数据库就在旁边。

所以我后来在 Pico-CRM 的查询层定了一个很朴素的习惯:

列表页里不要在 for / map 里查数据库。

一、N+1 在订单列表里长什么样

Pico-CRM 是一个家政服务 CRM,核心流程是:

客户 -> 服务需求 -> 订单 -> 排班 -> 完工

订单列表的主数据来自 orders 表,但页面并不只看订单本身,还要展示:

  • 客户姓名:来自 contacts
  • 服务项目:订单关联服务请求,再由服务请求关联 service_catalogs
  • 订单状态、时间、金额等订单字段

也就是说,一行订单 DTO 里混了几类展示字段。

如果按照逐行查询来算,分页 20 条时,最差情况大概是:

1 次订单分页查询
+ 20 次客户查询
+ 20 次服务请求查询
+ 20 次服务项目查询
= 61 次查询

分页大小一改,查询次数跟着涨。

这就是典型的 N+1。

这里的 “N” 不是一个抽象概念,而是后台列表页的分页条数。产品觉得 20 条太少,想改成 50 条,接口压力就会马上变大。

二、先分页,再批量补字段

Pico-CRM 里订单列表的实现放在:

backend/src/infrastructure/queries/crm/order_query_impl.rs

真实流程是这样:

let total = select
    .clone()
    .count(txn)
    .await
    .map_err(|e| format!("query orders count error: {}", e))?;

let models = select
    .order_by_desc(Column::InsertedAt)
    .offset(Some((query.page - 1) * query.page_size))
    .limit(Some(query.page_size))
    .all(txn)
    .await
    .map_err(|e| format!("query orders error: {}", e))?;

第一步还是正常的主表分页:先查总数,再查当前页数据。

重点在第二步:只对当前页数据收集关联 ID。

let customer_ids: HashSet<Uuid> = models
    .iter()
    .filter_map(|model| model.customer_uuid)
    .collect();

let request_ids: HashSet<Uuid> =
    models.iter().filter_map(|model| model.request_id).collect();

这里用 HashSet 有两个好处:

  • 自动去重,避免同一个客户被查多次
  • 当前页没有关联数据时,可以直接跳过后续查询

然后客户信息一次查完:

let mut customer_map: HashMap<Uuid, String> = HashMap::new();

if !customer_ids.is_empty() {
    let customers = ContactEntity::find()
        .filter(ContactColumn::ContactUuid.is_in(customer_ids.clone()))
        .all(txn)
        .await
        .map_err(|e| format!("query customers error: {}", e))?;

    for customer in customers {
        customer_map.insert(customer.contact_uuid, customer.user_name);
    }
}

这段代码的核心就两件事:

WHERE contact_uuid IN (...)
查询结果塞进 HashMap<Uuid, String>

后面回填 DTO 时,直接按 ID 查内存 map:

if let Some(customer_uuid) = customer_uuid {
    view.customer_name = customer_map.get(&customer_uuid).cloned();
}

查询次数不会随着订单条数线性增长。

三、关联链路多一层,也不要回到逐条查

客户名只隔一张表,比较简单。

服务项目名稍微绕一点:订单里不是直接存 service_catalog_uuid,而是存了 request_id。要拿服务项目名,需要走:

orders.request_id
  -> service_requests.uuid
  -> service_requests.service_catalog_uuid
  -> service_catalogs.name

这时很多人会下意识写成循环里查两次。

Pico-CRM 里还是同一个套路,只是拆成两段批量查询。

先批量查服务请求,并建立 request_id -> service_catalog_uuid 的映射:

let mut request_service_catalog_map: HashMap<Uuid, Option<Uuid>> =
    HashMap::new();

if !request_ids.is_empty() {
    let requests = ServiceRequestEntity::find()
        .filter(ServiceRequestColumn::Uuid.is_in(request_ids.clone()))
        .all(txn)
        .await
        .map_err(|e| format!("query service requests error: {}", e))?;

    for request in requests {
        request_service_catalog_map.insert(
            request.uuid,
            request.service_catalog_uuid,
        );
    }
}

再从这些请求里收集服务项目 ID:

let service_catalog_ids: HashSet<Uuid> = request_service_catalog_map
    .values()
    .filter_map(|value| *value)
    .collect();

最后批量查服务目录:

let mut service_catalog_map: HashMap<Uuid, String> = HashMap::new();

if !service_catalog_ids.is_empty() {
    let catalogs = ServiceCatalogEntity::find()
        .filter(ServiceCatalogColumn::Uuid.is_in(service_catalog_ids.clone()))
        .all(txn)
        .await
        .map_err(|e| format!("query service catalogs error: {}", e))?;

    for catalog in catalogs {
        service_catalog_map.insert(catalog.uuid, catalog.name);
    }
}

回填时再把两层 map 串起来:

if let Some(request_id) = request_id {
    if let Some(service_catalog_uuid) = request_service_catalog_map
        .get(&request_id)
        .and_then(|value| *value)
    {
        view.service_catalog_uuid = Some(service_catalog_uuid.to_string());
        view.service_catalog_name =
            service_catalog_map.get(&service_catalog_uuid).cloned();
    }
}

这样一来,订单列表的查询次数变成了稳定的几次:

1 次订单 count
1 次订单分页
1 次客户批量查询
1 次服务请求批量查询
1 次服务目录批量查询

分页 20 条是这些查询,分页 100 条还是这些查询。

当然,IN (...) 的参数数量也不是无限的。但后台分页列表通常是几十到一两百条,放在这个场景里,比每行查一次稳得多。

四、为什么不直接写一个大 join

这个问题很自然:既然都是查关联字段,为什么不直接 SQL join?

我的答案是:能 join,但不是所有列表页都值得 join 成一条 SQL。

订单列表这里有几个特点:

  • 主表必须先按订单条件分页
  • 客户名、服务项目名只是展示字段
  • 服务项目隔了一层服务请求
  • DTO 回填时还要保留一些 Option 语义
  • 查询层本来就是读模型适配层,不是领域规则所在

如果用 join,也能做。但 SQL 会越来越长,SeaORM 查询表达式也会更重。后面再加一个展示字段,比如创建人、派工人、服务人员,join 链会继续膨胀。

而现在这种写法的结构很清楚:

主表分页
-> 收集本页关联 ID
-> 按表批量查询
-> HashMap 回填 DTO

它牺牲了一点“单 SQL 的纯粹”,换来的是代码可读性和稳定查询次数。

我不是反对 join。像强过滤、强排序、必须由数据库完成聚合的场景,就应该让 SQL 来做。

但对于后台列表页的展示字段补齐,尤其是“分页以后补名称”的场景,内存 join 很好用。

五、服务请求列表也用了同一套模式

同样的实现也出现在:

backend/src/infrastructure/queries/crm/service_request_query_impl.rs

服务请求列表要展示三类名称:

  • 客户姓名:contacts
  • 创建人姓名:users
  • 服务项目名:service_catalogs

代码先从当前页模型里收集三组 ID:

let customer_ids: HashSet<Uuid> =
    models.iter().map(|model| model.customer_uuid).collect();

let user_ids: HashSet<Uuid> =
    models.iter().map(|model| model.creator_uuid).collect();

let service_catalog_ids: HashSet<Uuid> = models
    .iter()
    .filter_map(|model| model.service_catalog_uuid)
    .collect();

然后分别用 is_in 批量查三张表,构建三个 map:

let mut contact_map: HashMap<Uuid, String> = HashMap::new();
let mut user_map: HashMap<Uuid, String> = HashMap::new();
let mut service_catalog_map: HashMap<Uuid, String> = HashMap::new();

最后统一回填:

view.contact_name = contact_map.get(&customer_uuid).cloned();
view.creator_name = user_map.get(&creator_uuid).cloned();
view.service_catalog_name = service_catalog_uuid
    .and_then(|uuid| service_catalog_map.get(&uuid).cloned());

这就是我比较喜欢的代码形态:读起来很笨,但每一步都明确。

谁负责分页,谁负责批量查,谁负责 DTO 字段回填,都能一眼看出来。

六、员工绩效统计:不只是补名称,聚合也能这么做

is_in + HashMap 不只适合补名称。

Pico-CRM 的员工列表里有一段绩效统计,代码在:

backend/src/infrastructure/queries/identity/user_query_impl.rs

它要算每个家政人员的:

  • 已完成服务次数
  • 评价数
  • 平均评分
  • 售后数
  • 投诉数
  • 退款数
  • 返工数

这里如果逐个员工查一遍排班、评价、售后,查询次数会更夸张。

所以实现里先对当前页员工 ID 批量查已完成排班:

let schedule_items = ScheduleEntity::find()
    .filter(ScheduleColumn::AssignedUserUuid.is_in(user_ids.iter().copied()))
    .filter(ScheduleColumn::Status.eq("done"))
    .all(txn)
    .await
    .map_err(|e| format!("query user completed schedules error: {}", e))?;

评价也批量查:

let feedback_items = OrderFeedbackEntity::find()
    .filter(OrderFeedbackColumn::UserUuid.is_in(user_ids.iter().copied()))
    .all(txn)
    .await
    .map_err(|e| format!("query user feedback stats error: {}", e))?;

然后把订单和员工关系做成 map:

let schedule_order_map = schedule_items
    .iter()
    .map(|item| (item.order_uuid, item.assigned_user_uuid))
    .collect::<HashMap<_, _>>();

售后表是按订单关联的,于是先从 schedule_order_map 收集订单 ID,再批量查售后:

let order_ids = schedule_order_map.keys().copied().collect::<Vec<_>>();

let after_sales_cases = if order_ids.is_empty() {
    Vec::new()
} else {
    AfterSalesEntity::find()
        .filter(AfterSalesColumn::OrderUuid.is_in(order_ids.clone()))
        .all(txn)
        .await
        .map_err(|e| format!("query user after sales cases error: {}", e))?
};

统计结果则落到:

let mut stats_map = HashMap::<Uuid, UserPerformanceStats>::new();

后面不再查数据库,而是在内存里累加:

for item in schedule_items {
    let entry = stats_map.entry(item.assigned_user_uuid).or_default();
    entry.completed_service_count += 1;
}

平均评分也是先收集,再统一计算:

let mut rating_sums = HashMap::<Uuid, (i64, u64)>::new();

for item in feedback_items {
    let Some(user_uuid) = item.user_uuid else {
        continue;
    };

    let entry = stats_map.entry(user_uuid).or_default();
    entry.feedback_count += 1;

    if let Some(rating) = item.rating {
        let rating_entry = rating_sums.entry(user_uuid).or_insert((0, 0));
        rating_entry.0 += rating as i64;
        rating_entry.1 += 1;
    }
}

这段和订单列表本质一样:

当前页主实体
-> 收集关联键
-> 批量查相关表
-> 用 HashMap 归并结果

只不过订单列表是回填展示字段,员工绩效是回填统计字段。

七、这套写法的边界

这个模式很好用,但不是银弹。

我现在大概按这几个标准判断。

适合用 is_in + HashMap 的场景:

  • 主表已经分页
  • 关联字段只是展示或轻量统计
  • 关联数据量跟分页大小同级
  • 业务上允许在应用层组装 DTO
  • 查询可读性比“单条 SQL”更重要

更适合 SQL join / group by 的场景:

  • 需要按关联表字段过滤
  • 需要按关联表字段排序
  • 聚合数据量远大于当前页
  • 需要数据库利用索引和执行计划完成复杂计算
  • 返回数据必须严格由一条一致性查询保证

还有一个细节:Pico-CRM 的这些查询基本都包在 with_shared_txn() 里,同一个列表查询里的多次读取发生在一个事务上下文中。

这不是为了解决所有一致性问题,而是让一次查询组装过程的数据库访问边界更清晰。

总结

N+1 查询不一定要靠复杂框架解决。

在很多后台系统里,最实用的办法反而是这三步:

HashSet 收 ID
SeaORM is_in 批量查
HashMap 回填 DTO

Pico-CRM 里的订单列表、服务请求列表、员工绩效统计都用了这个套路。它没有什么炫技的地方,但能把“分页越大查询越多”的问题,压回到稳定的几次数据库访问。

我现在看列表页接口,第一眼就会搜一件事:

有没有在循环里查数据库。

如果有,基本就该停下来,把 ID 先收集起来。

项目开源在 GitHub,搜 Pico-CRM 可以看到完整代码。

你们项目里的列表页关联字段,是习惯直接 join,还是应用层批量补字段?评论区聊聊。

更多推荐