别急着优化 Python:一次“代码很慢,其实是数据库/网络慢”的排查复盘
别急着优化 Python:一次“代码很慢,其实是数据库/网络慢”的排查复盘
管理层催优化,业务方说页面卡,团队群里已经开始热火朝天地讨论:“是不是 Python 太慢?”“要不要把循环改成列表推导式?”“是不是该上异步?”“要不要重构订单模块?”
这些声音都很熟悉。作为工程师,我们很容易把“慢”理解成“代码慢”,然后立刻钻进函数、循环、对象创建、字符串拼接里寻找答案。但高级工程师真正的价值,往往不在于第一时间写出更复杂的优化代码,而在于先问一句:
慢,到底慢在哪里?
这篇文章讲一个我会如何排查“看起来像 Python 慢,其实是数据库/网络慢”的真实工作型案例。重点不是炫技,而是给出一套可落地的方法:如何定位瓶颈、如何说服团队停下盲目优化、如何用数据推动正确决策。
一、场景:订单处理系统突然变慢
假设我们维护一个订单处理服务,核心逻辑大致如下:
def process_orders():
orders = get_pending_orders()
for order in orders:
user = get_user(order.user_id)
inventory = get_inventory(order.sku_id)
if inventory.stock > 0:
create_shipment(order)
update_inventory(order.sku_id, -1)
notify_user(user.email, order.id)
最近业务反馈:订单处理延迟从原来的 2 秒涨到了 20 秒。管理层很着急,要求“尽快优化 Python 代码”。
团队第一反应是改代码:
# 有人建议:把 for 循环改成列表推导式
results = [handle_order(order) for order in orders]
也有人建议:
# 有人建议:拆函数、减少对象创建
还有人说:
# 要不要上 asyncio?
这些建议不一定错,但它们都有一个共同问题:没有证据。
在没有定位瓶颈之前,优化就是猜谜。
二、第一步:先用 cProfile 看 Python 时间花在哪里
我会先把主流程包起来,用 cProfile 做一次粗粒度画像。
import cProfile
import pstats
def run():
process_orders()
profiler = cProfile.Profile()
profiler.enable()
run()
profiler.disable()
pstats.Stats(profiler).sort_stats("cumtime").print_stats(20)
这里重点看 cumtime,也就是函数及其子函数累计耗时。
假设输出类似这样:
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.002 0.002 21.435 21.435 orders.py:10(process_orders)
500 0.010 0.000 8.921 0.018 db.py:21(get_user)
500 0.013 0.000 7.832 0.016 db.py:35(get_inventory)
500 0.008 0.000 3.102 0.006 notify.py:12(notify_user)
500 0.004 0.000 1.215 0.002 db.py:56(update_inventory)
这个结果已经很有信息量。
process_orders 总耗时 21 秒,但它自己真正执行 Python 逻辑的时间几乎没有多少。大量时间都消耗在:
get_userget_inventorynotify_userupdate_inventory
这些函数背后大概率是数据库查询、网络请求、第三方服务调用。
这时我会马上阻止团队继续讨论“Python 循环怎么写更快”。因为现在最明显的信号是:
Python 不是最大嫌疑人,I/O 才是。
三、第二步:拆开数据库和网络耗时
cProfile 能告诉我们函数慢,但不一定能告诉我们具体是数据库慢、网络慢、还是序列化慢。
下一步,我会给关键外部调用加轻量级计时日志。
import time
import logging
from functools import wraps
logging.basicConfig(level=logging.INFO)
def trace_time(name):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
return func(*args, **kwargs)
finally:
cost = time.perf_counter() - start
logging.info("%s cost %.4fs", name, cost)
return wrapper
return decorator
然后包住几个高风险函数:
@trace_time("db.get_user")
def get_user(user_id):
return db.query(User).filter(User.id == user_id).first()
@trace_time("db.get_inventory")
def get_inventory(sku_id):
return db.query(Inventory).filter(Inventory.sku_id == sku_id).first()
@trace_time("network.notify_user")
def notify_user(email, order_id):
return requests.post(
"https://notification.example.com/send",
json={"email": email, "order_id": order_id},
timeout=3,
)
再跑一次,日志可能变成这样:
db.get_user cost 0.0181s
db.get_inventory cost 0.0164s
network.notify_user cost 0.0068s
db.get_user cost 0.0175s
db.get_inventory cost 0.0159s
network.notify_user cost 0.0065s
...
单次看起来都不慢,十几毫秒而已。但如果有 500 个订单,每个订单查两三次数据库,就会变成灾难。
这就是典型的 N+1 查询问题。
四、第三步:识别 N+1 查询,而不是优化 Python 循环
原来的代码是这样的:
orders = get_pending_orders()
for order in orders:
user = get_user(order.user_id)
inventory = get_inventory(order.sku_id)
如果有 500 个订单:
1 次查询订单
500 次查询用户
500 次查询库存
总共 1001 次数据库访问。
即使每次查询只有 15ms,累计起来也是:
1000 * 15ms = 15s
这还没算网络抖动、数据库连接池等待、索引缺失、锁竞争。
这时,如果团队还在优化下面这种东西:
for order in orders:
handle(order)
改成:
list(map(handle, orders))
本质上没有意义。
因为真正慢的不是 for,而是 for 里面每次都去访问数据库。
五、第四步:批量查询,减少 I/O 次数
正确方向是减少数据库往返次数。
可以先收集所有 user_id 和 sku_id:
def process_orders():
orders = get_pending_orders()
user_ids = {order.user_id for order in orders}
sku_ids = {order.sku_id for order in orders}
users = get_users_by_ids(user_ids)
inventories = get_inventories_by_sku_ids(sku_ids)
user_map = {user.id: user for user in users}
inventory_map = {item.sku_id: item for item in inventories}
for order in orders:
user = user_map.get(order.user_id)
inventory = inventory_map.get(order.sku_id)
if user and inventory and inventory.stock > 0:
create_shipment(order)
update_inventory(order.sku_id, -1)
notify_user(user.email, order.id)
对应数据库查询:
def get_users_by_ids(user_ids):
return db.query(User).filter(User.id.in_(user_ids)).all()
def get_inventories_by_sku_ids(sku_ids):
return db.query(Inventory).filter(Inventory.sku_id.in_(sku_ids)).all()
这样查询次数从:
1 + 500 + 500
变成:
1 + 1 + 1
这类优化往往比“把 Python 写得更花哨”有效得多。
六、第五步:继续排查网络慢,避免同步阻塞
假设数据库优化后,耗时从 21 秒降到 7 秒。继续用 profile 和日志看,发现 notify_user 占了大头。
原来每个订单都同步通知用户:
for order in orders:
notify_user(user.email, order.id)
如果通知服务偶尔慢,每次 50ms,500 个订单就是 25 秒的潜在风险。
这类场景要问一个关键问题:
通知必须在订单主流程里同步完成吗?
如果不必须,应该异步化或队列化。
例如把通知任务写入消息队列:
def enqueue_notification(email, order_id):
message_queue.publish({
"type": "order_created",
"email": email,
"order_id": order_id,
})
主流程中只负责投递任务:
for order in orders:
enqueue_notification(user.email, order.id)
由后台 worker 慢慢消费:
def notification_worker():
while True:
message = message_queue.consume()
notify_user(message["email"], message["order_id"])
这不是单纯的代码优化,而是架构优化:把外部不稳定依赖从主链路中拆出去。
七、第六步:如果必须并发,才考虑 asyncio
很多团队一听到网络慢,就立刻说“上 asyncio”。但我通常会先判断:
- 当前慢的是不是 I/O?
- 调用是否可以并发?
- 使用的库是否支持异步?
- 并发是否会压垮下游服务?
- 是否有超时、限流、重试和熔断?
如果通知服务必须实时调用,可以用异步并发,但要控制并发量。
import asyncio
import aiohttp
async def notify_user_async(session, email, order_id):
async with session.post(
"https://notification.example.com/send",
json={"email": email, "order_id": order_id},
timeout=3,
) as response:
return await response.text()
async def notify_all(users_orders):
connector = aiohttp.TCPConnector(limit=20)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [
notify_user_async(session, email, order_id)
for email, order_id in users_orders
]
return await asyncio.gather(*tasks, return_exceptions=True)
注意这里的 limit=20 很关键。高级工程师不会为了追求并发而无脑把 500 个请求同时打出去。那不是优化,是制造事故。
八、数据库慢,还要继续看索引和 SQL
如果日志显示单次数据库查询就很慢,比如:
db.get_inventory cost 1.2387s
那就不是 N+1 这么简单了,可能是 SQL 本身慢。
要继续查看:
EXPLAIN ANALYZE
SELECT *
FROM inventory
WHERE sku_id = 'SKU_123';
如果发现全表扫描:
Seq Scan on inventory
Filter: sku_id = 'SKU_123'
那就应该加索引:
CREATE INDEX idx_inventory_sku_id ON inventory(sku_id);
对于 ORM 项目,也不能只看 Python 代码。ORM 很方便,但它生成的 SQL 不一定总是你以为的样子。
例如 SQLAlchemy 可以打开 SQL 日志:
engine = create_engine(
DATABASE_URL,
echo=True,
pool_size=10,
max_overflow=20,
)
Django 可以查看查询数量:
from django.db import connection
print(len(connection.queries))
for query in connection.queries[:10]:
print(query["sql"], query["time"])
很多所谓“Python 慢”,最后都会变成:
查询太多
索引缺失
连接池不够
事务太长
锁等待严重
下游服务超时
九、一张排查流程图:先定位,再优化
可以把排查路径简化成这样:
用户反馈慢
|
v
确认慢的接口/任务/时间段
|
v
加整体耗时日志
|
v
使用 cProfile / APM / trace 定位热点
|
v
判断瓶颈类型
|
+--> CPU 密集:算法、数据结构、缓存、C 扩展、多进程
|
+--> 数据库慢:SQL、索引、N+1、连接池、锁、事务
|
+--> 网络慢:超时、重试、并发、队列、下游 SLA
|
+--> 文件/磁盘慢:批处理、缓冲、压缩、存储策略
|
v
小步修改
|
v
压测验证
|
v
上线观察
这张图的核心思想是:不要用经验代替证据。
十、如何向管理层解释:我们不是不优化,而是在避免误优化
当管理层催优化时,工程师最怕陷入两个极端。
一种是立刻承诺:
我们马上重构,三天内搞定。
另一种是技术化逃避:
这个系统很复杂,要慢慢看。
更好的表达方式是:
我们已经开始定位性能瓶颈。当前不会直接改业务代码,因为没有证据表明 Python 逻辑是主因。我们会先完成三件事:第一,记录接口整体耗时;第二,拆分数据库、网络和 Python 内部计算耗时;第三,找出最耗时的前几个调用点。定位完成后,再针对性优化,避免花时间改错地方。
这句话很重要。它传达了三个信息:
- 你在行动;
- 你有方法;
- 你在控制风险。
高级工程师的价值,不只是“写代码快”,而是能把混乱问题变成清晰路径。
十一、最佳实践清单:排查“假 Python 慢”的实用步骤
1. 先记录整体耗时
start = time.perf_counter()
process_orders()
print(f"total cost: {time.perf_counter() - start:.4f}s")
不要一开始就陷入局部细节。
2. 用 cProfile 找累计耗时
pstats.Stats(profiler).sort_stats("cumtime").print_stats(20)
优先看 cumtime,因为很多慢函数本身不慢,慢在它调用的下游。
3. 对外部依赖单独打点
数据库、Redis、HTTP、文件系统、消息队列都应该单独计时。
@trace_time("redis.get")
def get_cache(key):
return redis_client.get(key)
4. 统计调用次数
慢不一定是单次慢,也可能是调用太多。
from collections import Counter
counter = Counter()
def trace_count(name):
def decorator(func):
def wrapper(*args, **kwargs):
counter[name] += 1
return func(*args, **kwargs)
return wrapper
return decorator
最后输出:
print(counter)
如果你看到:
get_user: 500
get_inventory: 500
就要警觉 N+1。
5. 设置超时,不要无限等待
requests.get(url, timeout=(1, 3))
这里 (1, 3) 表示连接超时 1 秒,读取超时 3 秒。没有超时的网络调用,是线上系统的隐形炸弹。
6. 优先减少 I/O 次数
批量查询、缓存、预加载、队列化,通常比微优化 Python 语法更有效。
7. 优化后必须复测
优化不是“我觉得快了”,而是数据对比。
优化前:平均 21.4s,P95 28.7s
优化后:平均 3.2s,P95 5.1s
最好保留一张简单对比表:
| 阶段 | 平均耗时 | P95 耗时 | 主要瓶颈 |
|---|---|---|---|
| 初始版本 | 21.4s | 28.7s | N+1 查询、同步通知 |
| 批量查询后 | 7.2s | 9.8s | 网络通知 |
| 队列化通知后 | 3.2s | 5.1s | 少量数据库写入 |
十二、高级工程师的价值:找对问题,比写快代码更重要
初级工程师看到慢,可能会说:
我要优化这段代码。
成熟工程师会问:
慢在哪里?证据是什么?影响面多大?改哪里收益最高?会不会引入新风险?
高级工程师则会进一步考虑:
这个问题为什么会发生?
我们的系统有没有观测能力?
有没有性能基线?
有没有上线前压测?
有没有防止 N+1 的代码审查机制?
有没有下游超时和降级策略?
真正的能力差距,就藏在这些问题里。
因为软件系统里的“慢”,常常不是某一行代码导致的,而是多个因素叠加:
数据量增长
索引缺失
调用次数膨胀
连接池耗尽
下游服务抖动
日志过多
缓存失效
架构耦合
如果没有系统化排查能力,团队就会陷入“哪里看起来像问题,就改哪里”的循环。
而这正是最昂贵的浪费。
十三、结语:性能优化的第一原则,是尊重事实
Python 并不完美,它确实有性能边界。但在大量业务系统中,真正拖慢服务的,往往不是 Python 解释器,而是数据库、网络、磁盘、锁、队列和架构设计。
当你下一次听到“Python 太慢了”时,不妨先停下来,打开 profiler,打上耗时日志,看一眼 SQL,查一下网络调用。
不要急着证明自己能写更快的代码。
先证明你能找对问题。
这才是高级工程师最稀缺、也最有价值的能力。
更多推荐
所有评论(0)