别急着优化 Python:一次“代码很慢,其实是数据库/网络慢”的排查复盘

管理层催优化,业务方说页面卡,团队群里已经开始热火朝天地讨论:“是不是 Python 太慢?”“要不要把循环改成列表推导式?”“是不是该上异步?”“要不要重构订单模块?”

这些声音都很熟悉。作为工程师,我们很容易把“慢”理解成“代码慢”,然后立刻钻进函数、循环、对象创建、字符串拼接里寻找答案。但高级工程师真正的价值,往往不在于第一时间写出更复杂的优化代码,而在于先问一句:

慢,到底慢在哪里?

这篇文章讲一个我会如何排查“看起来像 Python 慢,其实是数据库/网络慢”的真实工作型案例。重点不是炫技,而是给出一套可落地的方法:如何定位瓶颈、如何说服团队停下盲目优化、如何用数据推动正确决策。


一、场景:订单处理系统突然变慢

假设我们维护一个订单处理服务,核心逻辑大致如下:

def process_orders():
    orders = get_pending_orders()

    for order in orders:
        user = get_user(order.user_id)
        inventory = get_inventory(order.sku_id)

        if inventory.stock > 0:
            create_shipment(order)
            update_inventory(order.sku_id, -1)
            notify_user(user.email, order.id)

最近业务反馈:订单处理延迟从原来的 2 秒涨到了 20 秒。管理层很着急,要求“尽快优化 Python 代码”。

团队第一反应是改代码:

# 有人建议:把 for 循环改成列表推导式
results = [handle_order(order) for order in orders]

也有人建议:

# 有人建议:拆函数、减少对象创建

还有人说:

# 要不要上 asyncio?

这些建议不一定错,但它们都有一个共同问题:没有证据。

在没有定位瓶颈之前,优化就是猜谜。


二、第一步:先用 cProfile 看 Python 时间花在哪里

我会先把主流程包起来,用 cProfile 做一次粗粒度画像。

import cProfile
import pstats

def run():
    process_orders()

profiler = cProfile.Profile()
profiler.enable()
run()
profiler.disable()

pstats.Stats(profiler).sort_stats("cumtime").print_stats(20)

这里重点看 cumtime,也就是函数及其子函数累计耗时。

假设输出类似这样:

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
1       0.002    0.002    21.435   21.435 orders.py:10(process_orders)
500     0.010    0.000    8.921    0.018  db.py:21(get_user)
500     0.013    0.000    7.832    0.016  db.py:35(get_inventory)
500     0.008    0.000    3.102    0.006  notify.py:12(notify_user)
500     0.004    0.000    1.215    0.002  db.py:56(update_inventory)

这个结果已经很有信息量。

process_orders 总耗时 21 秒,但它自己真正执行 Python 逻辑的时间几乎没有多少。大量时间都消耗在:

  • get_user
  • get_inventory
  • notify_user
  • update_inventory

这些函数背后大概率是数据库查询、网络请求、第三方服务调用。

这时我会马上阻止团队继续讨论“Python 循环怎么写更快”。因为现在最明显的信号是:

Python 不是最大嫌疑人,I/O 才是。


三、第二步:拆开数据库和网络耗时

cProfile 能告诉我们函数慢,但不一定能告诉我们具体是数据库慢、网络慢、还是序列化慢。

下一步,我会给关键外部调用加轻量级计时日志。

import time
import logging
from functools import wraps

logging.basicConfig(level=logging.INFO)

def trace_time(name):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start = time.perf_counter()
            try:
                return func(*args, **kwargs)
            finally:
                cost = time.perf_counter() - start
                logging.info("%s cost %.4fs", name, cost)
        return wrapper
    return decorator

然后包住几个高风险函数:

@trace_time("db.get_user")
def get_user(user_id):
    return db.query(User).filter(User.id == user_id).first()

@trace_time("db.get_inventory")
def get_inventory(sku_id):
    return db.query(Inventory).filter(Inventory.sku_id == sku_id).first()

@trace_time("network.notify_user")
def notify_user(email, order_id):
    return requests.post(
        "https://notification.example.com/send",
        json={"email": email, "order_id": order_id},
        timeout=3,
    )

再跑一次,日志可能变成这样:

db.get_user cost 0.0181s
db.get_inventory cost 0.0164s
network.notify_user cost 0.0068s
db.get_user cost 0.0175s
db.get_inventory cost 0.0159s
network.notify_user cost 0.0065s
...

单次看起来都不慢,十几毫秒而已。但如果有 500 个订单,每个订单查两三次数据库,就会变成灾难。

这就是典型的 N+1 查询问题


四、第三步:识别 N+1 查询,而不是优化 Python 循环

原来的代码是这样的:

orders = get_pending_orders()

for order in orders:
    user = get_user(order.user_id)
    inventory = get_inventory(order.sku_id)

如果有 500 个订单:

1 次查询订单
500 次查询用户
500 次查询库存

总共 1001 次数据库访问。

即使每次查询只有 15ms,累计起来也是:

1000 * 15ms = 15s

这还没算网络抖动、数据库连接池等待、索引缺失、锁竞争。

这时,如果团队还在优化下面这种东西:

for order in orders:
    handle(order)

改成:

list(map(handle, orders))

本质上没有意义。

因为真正慢的不是 for,而是 for 里面每次都去访问数据库。


五、第四步:批量查询,减少 I/O 次数

正确方向是减少数据库往返次数。

可以先收集所有 user_idsku_id

def process_orders():
    orders = get_pending_orders()

    user_ids = {order.user_id for order in orders}
    sku_ids = {order.sku_id for order in orders}

    users = get_users_by_ids(user_ids)
    inventories = get_inventories_by_sku_ids(sku_ids)

    user_map = {user.id: user for user in users}
    inventory_map = {item.sku_id: item for item in inventories}

    for order in orders:
        user = user_map.get(order.user_id)
        inventory = inventory_map.get(order.sku_id)

        if user and inventory and inventory.stock > 0:
            create_shipment(order)
            update_inventory(order.sku_id, -1)
            notify_user(user.email, order.id)

对应数据库查询:

def get_users_by_ids(user_ids):
    return db.query(User).filter(User.id.in_(user_ids)).all()

def get_inventories_by_sku_ids(sku_ids):
    return db.query(Inventory).filter(Inventory.sku_id.in_(sku_ids)).all()

这样查询次数从:

1 + 500 + 500

变成:

1 + 1 + 1

这类优化往往比“把 Python 写得更花哨”有效得多。


六、第五步:继续排查网络慢,避免同步阻塞

假设数据库优化后,耗时从 21 秒降到 7 秒。继续用 profile 和日志看,发现 notify_user 占了大头。

原来每个订单都同步通知用户:

for order in orders:
    notify_user(user.email, order.id)

如果通知服务偶尔慢,每次 50ms,500 个订单就是 25 秒的潜在风险。

这类场景要问一个关键问题:

通知必须在订单主流程里同步完成吗?

如果不必须,应该异步化或队列化。

例如把通知任务写入消息队列:

def enqueue_notification(email, order_id):
    message_queue.publish({
        "type": "order_created",
        "email": email,
        "order_id": order_id,
    })

主流程中只负责投递任务:

for order in orders:
    enqueue_notification(user.email, order.id)

由后台 worker 慢慢消费:

def notification_worker():
    while True:
        message = message_queue.consume()
        notify_user(message["email"], message["order_id"])

这不是单纯的代码优化,而是架构优化:把外部不稳定依赖从主链路中拆出去。


七、第六步:如果必须并发,才考虑 asyncio

很多团队一听到网络慢,就立刻说“上 asyncio”。但我通常会先判断:

  1. 当前慢的是不是 I/O?
  2. 调用是否可以并发?
  3. 使用的库是否支持异步?
  4. 并发是否会压垮下游服务?
  5. 是否有超时、限流、重试和熔断?

如果通知服务必须实时调用,可以用异步并发,但要控制并发量。

import asyncio
import aiohttp

async def notify_user_async(session, email, order_id):
    async with session.post(
        "https://notification.example.com/send",
        json={"email": email, "order_id": order_id},
        timeout=3,
    ) as response:
        return await response.text()

async def notify_all(users_orders):
    connector = aiohttp.TCPConnector(limit=20)

    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [
            notify_user_async(session, email, order_id)
            for email, order_id in users_orders
        ]
        return await asyncio.gather(*tasks, return_exceptions=True)

注意这里的 limit=20 很关键。高级工程师不会为了追求并发而无脑把 500 个请求同时打出去。那不是优化,是制造事故。


八、数据库慢,还要继续看索引和 SQL

如果日志显示单次数据库查询就很慢,比如:

db.get_inventory cost 1.2387s

那就不是 N+1 这么简单了,可能是 SQL 本身慢。

要继续查看:

EXPLAIN ANALYZE
SELECT *
FROM inventory
WHERE sku_id = 'SKU_123';

如果发现全表扫描:

Seq Scan on inventory
Filter: sku_id = 'SKU_123'

那就应该加索引:

CREATE INDEX idx_inventory_sku_id ON inventory(sku_id);

对于 ORM 项目,也不能只看 Python 代码。ORM 很方便,但它生成的 SQL 不一定总是你以为的样子。

例如 SQLAlchemy 可以打开 SQL 日志:

engine = create_engine(
    DATABASE_URL,
    echo=True,
    pool_size=10,
    max_overflow=20,
)

Django 可以查看查询数量:

from django.db import connection

print(len(connection.queries))
for query in connection.queries[:10]:
    print(query["sql"], query["time"])

很多所谓“Python 慢”,最后都会变成:

查询太多
索引缺失
连接池不够
事务太长
锁等待严重
下游服务超时

九、一张排查流程图:先定位,再优化

可以把排查路径简化成这样:

用户反馈慢
   |
   v
确认慢的接口/任务/时间段
   |
   v
加整体耗时日志
   |
   v
使用 cProfile / APM / trace 定位热点
   |
   v
判断瓶颈类型
   |
   +--> CPU 密集:算法、数据结构、缓存、C 扩展、多进程
   |
   +--> 数据库慢:SQL、索引、N+1、连接池、锁、事务
   |
   +--> 网络慢:超时、重试、并发、队列、下游 SLA
   |
   +--> 文件/磁盘慢:批处理、缓冲、压缩、存储策略
   |
   v
小步修改
   |
   v
压测验证
   |
   v
上线观察

这张图的核心思想是:不要用经验代替证据。


十、如何向管理层解释:我们不是不优化,而是在避免误优化

当管理层催优化时,工程师最怕陷入两个极端。

一种是立刻承诺:

我们马上重构,三天内搞定。

另一种是技术化逃避:

这个系统很复杂,要慢慢看。

更好的表达方式是:

我们已经开始定位性能瓶颈。当前不会直接改业务代码,因为没有证据表明 Python 逻辑是主因。我们会先完成三件事:第一,记录接口整体耗时;第二,拆分数据库、网络和 Python 内部计算耗时;第三,找出最耗时的前几个调用点。定位完成后,再针对性优化,避免花时间改错地方。

这句话很重要。它传达了三个信息:

  1. 你在行动;
  2. 你有方法;
  3. 你在控制风险。

高级工程师的价值,不只是“写代码快”,而是能把混乱问题变成清晰路径。


十一、最佳实践清单:排查“假 Python 慢”的实用步骤

1. 先记录整体耗时

start = time.perf_counter()
process_orders()
print(f"total cost: {time.perf_counter() - start:.4f}s")

不要一开始就陷入局部细节。


2. 用 cProfile 找累计耗时

pstats.Stats(profiler).sort_stats("cumtime").print_stats(20)

优先看 cumtime,因为很多慢函数本身不慢,慢在它调用的下游。


3. 对外部依赖单独打点

数据库、Redis、HTTP、文件系统、消息队列都应该单独计时。

@trace_time("redis.get")
def get_cache(key):
    return redis_client.get(key)

4. 统计调用次数

慢不一定是单次慢,也可能是调用太多。

from collections import Counter

counter = Counter()

def trace_count(name):
    def decorator(func):
        def wrapper(*args, **kwargs):
            counter[name] += 1
            return func(*args, **kwargs)
        return wrapper
    return decorator

最后输出:

print(counter)

如果你看到:

get_user: 500
get_inventory: 500

就要警觉 N+1。


5. 设置超时,不要无限等待

requests.get(url, timeout=(1, 3))

这里 (1, 3) 表示连接超时 1 秒,读取超时 3 秒。没有超时的网络调用,是线上系统的隐形炸弹。


6. 优先减少 I/O 次数

批量查询、缓存、预加载、队列化,通常比微优化 Python 语法更有效。


7. 优化后必须复测

优化不是“我觉得快了”,而是数据对比。

优化前:平均 21.4s,P95 28.7s
优化后:平均 3.2s,P95 5.1s

最好保留一张简单对比表:

阶段 平均耗时 P95 耗时 主要瓶颈
初始版本 21.4s 28.7s N+1 查询、同步通知
批量查询后 7.2s 9.8s 网络通知
队列化通知后 3.2s 5.1s 少量数据库写入

十二、高级工程师的价值:找对问题,比写快代码更重要

初级工程师看到慢,可能会说:

我要优化这段代码。

成熟工程师会问:

慢在哪里?证据是什么?影响面多大?改哪里收益最高?会不会引入新风险?

高级工程师则会进一步考虑:

这个问题为什么会发生?
我们的系统有没有观测能力?
有没有性能基线?
有没有上线前压测?
有没有防止 N+1 的代码审查机制?
有没有下游超时和降级策略?

真正的能力差距,就藏在这些问题里。

因为软件系统里的“慢”,常常不是某一行代码导致的,而是多个因素叠加:

数据量增长
索引缺失
调用次数膨胀
连接池耗尽
下游服务抖动
日志过多
缓存失效
架构耦合

如果没有系统化排查能力,团队就会陷入“哪里看起来像问题,就改哪里”的循环。

而这正是最昂贵的浪费。


十三、结语:性能优化的第一原则,是尊重事实

Python 并不完美,它确实有性能边界。但在大量业务系统中,真正拖慢服务的,往往不是 Python 解释器,而是数据库、网络、磁盘、锁、队列和架构设计。

当你下一次听到“Python 太慢了”时,不妨先停下来,打开 profiler,打上耗时日志,看一眼 SQL,查一下网络调用。

不要急着证明自己能写更快的代码。

先证明你能找对问题。

这才是高级工程师最稀缺、也最有价值的能力。

更多推荐