1. 项目概述:为什么装饰器在多进程里会“失联”?

你写好了一个漂亮的装饰器,给函数加上计时、日志、性能追踪功能,一切运行如丝般顺滑——直到你把它放进 multiprocessing.Pool 里。啪,报错: AttributeError: Can't pickle local object 'track_execution_time.<locals>.wrapper' 。不是代码逻辑错了,不是环境配错了,是 Python 在底层悄悄给你设了一道“门禁”。这道门,叫 Pickle

我第一次遇到这个问题是在给一个文本预处理流水线加实时执行热力图时。用的是 heartrate 库,它需要每个子进程绑定独立端口启动 Web 服务。但只要一加 @track_execution_time 这个装饰器, Pool.map() 就直接崩。查日志、翻文档、改初始化方式,折腾两小时才意识到:问题根本不在 heartrate ,也不在 multiprocessing ,而在于 Python 序列化机制对“函数身份”的严苛定义。这不是 bug,是设计哲学——Python 不保存函数的字节码,只存它的“全名路径”,靠这个路径在反序列化时重新 import 回来。而装饰器生成的 wrapper 函数,天生就是“匿名居民”:它被定义在 track_execution_time 的局部作用域里,没有全局变量名, __qualname__ track_execution_time.<locals>.wrapper ,这种路径在子进程中根本找不到对应实体。

关键词“Towards AI - Medium”在这里不是平台标签,而是提醒我们:这篇文章的原始语境,是面向数据工程师和算法工程师的实战笔记,不是理论论文。它不讲“什么是 Pickle”,而是直击“为什么我的装饰器进不了 Pool”。所以本文也延续这个风格:不堆砌术语,不复述文档,只说你调试时真正卡住的那几行报错、你删掉 @wraps 后看到的诡异行为、你手动赋值 __qualname__ 时灵光一闪的顿悟。我会把 heartrate 的端口冲突、 fork spawn 的内存差异、甚至 Ctrl+C 失效这种 IDE 级别的玄学问题,全部拆解成可验证、可复现、可抄作业的操作细节。如果你正被 Can't pickle 报错困在工位上,这篇就是为你写的“破壁指南”。

2. 核心原理拆解:Pickle 不是存代码,是存“寻址指令”

2.1 Pickle 的本质:一份跨进程的“快递单”

很多人误以为 pickle 是把函数的代码“打包带走”。错。它更像一张快递单:上面只写明了“收件人地址”(模块名 + 函数名),不附带任何货物(函数体)。当子进程收到这张单子,它做的第一件事不是执行,而是去自己的 Python 环境里,按地址找人—— import 模块,然后 getattr(module, function_name) 拿到那个函数对象。

我们用最简例子验证:

import pickle

def add(x, y):
    return x + y

# 序列化:只记录“add”这个名字,以及它属于当前模块
pickled = pickle.dumps(add)

# 关键一步:人为制造“地址失效”
del globals()['add']  # 把收件人从通讯录里删掉

# 反序列化:子进程(或当前进程)尝试按单子找人
try:
    loaded_add = pickle.loads(pickled)
    print(loaded_add(2, 3))
except AttributeError as e:
    print("报错:", e)  # 输出:Can't get attribute 'add' on <module '__main__' ...>

看清楚了? pickle.dumps(add) 成功了,因为 add 当时还在全局命名空间里, __qualname__ 就是 'add' ,地址清晰。但 del globals()['add'] 之后, pickle.loads() 就找不到人了。它不关心你原来 add 是加法还是减法,它只认地址簿上的名字。这就是 Can't get attribute 的根源——不是代码丢了,是门牌号被抹掉了。

2.2 装饰器如何“伪造”了门牌号?

再看装饰器的典型结构:

def track_execution_time(func):
    def wrapper(*args, **kwargs):  # ← 这个 wrapper 是局部变量!
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"耗时: {end - start}")
        return result
    return wrapper  # ← 返回的是局部函数对象,没有全局名

@track_execution_time
def meaningless_task(text):
    return len(text)

执行 @track_execution_time 时,Python 做了两件事:

  1. 调用 track_execution_time(meaningless_task)
  2. 把返回值(即 wrapper 函数对象)赋给 meaningless_task 这个名字。

关键点来了: wrapper 这个函数对象,是在 track_execution_time 的函数体内 def 出来的。它在 track_execution_time 执行完后,就脱离了其作用域。虽然 meaningless_task 这个名字现在指向它,但它的 __qualname__ 属性,Python 解释器自动设为 'track_execution_time.<locals>.wrapper' —— 这是一个“内部地址”,就像“北京市朝阳区某大厦3层B座-张三的临时工位”,不是正式注册的办公地址。

multiprocessing 需要将 meaningless_task 发送给子进程时,它调用 pickle.dumps(meaningless_task) 。Pickle 查 meaningless_task.__qualname__ ,得到 'track_execution_time.<locals>.wrapper' ,于是试图在子进程中执行 from __main__ import track_execution_time; getattr(track_execution_time, '<locals>.wrapper') 。但 track_execution_time 是个函数,不是模块,它内部根本没有一个叫 <locals>.wrapper 的公开属性。子进程懵了,抛出 Can't pickle local object

提示:你可以随时用 print(meaningless_task.__qualname__) 验证。未加 @wraps 时,输出是 track_execution_time.<locals>.wrapper ;加了 @wraps(func) 后,输出变成 meaningless_task —— 地址变了,门牌号换成了正式注册名。

2.3 functools.wraps 干了什么?不只是复制文档字符串

@wraps(func) 常被简单理解为“把原函数的 __doc__ 复制过来”。这是对的,但远远不够。它的核心动作,是重写被装饰函数的 __qualname__ __module__ __name__ 等关键元信息,让 wrapper “冒充”成原函数本身。

functools.wraps 的源码本质是调用 update_wrapper ,它默认复制以下属性(来自 WRAPPER_ASSIGNMENTS ):

  • __module__ :函数定义所在的模块名(如 '__main__'
  • __name__ :函数的原始名称(如 'meaningless_task'
  • __qualname__ :函数的限定名称(如 'meaningless_task' ,对顶层函数和 __name__ 相同)
  • __annotations__ :类型注解字典
  • __doc__ :文档字符串
  • __type_params__ :类型参数(Python 3.12+)

其中, __qualname__ 是 Pickle 的命门。 @wraps(func) 的等价手动操作就是:

def track_execution_time(func):
    def wrapper(*args, **kwargs):
        # ... 功能代码 ...
        return result
    # 手动覆盖关键元信息,让 wrapper “看起来像” func
    wrapper.__module__ = func.__module__
    wrapper.__name__ = func.__name__
    wrapper.__qualname__ = func.__qualname__  # ← 这一行最关键!
    wrapper.__doc__ = func.__doc__
    return wrapper

执行后, meaningless_task.__qualname__ 就从 track_execution_time.<locals>.wrapper 变成了 meaningless_task 。Pickle 再序列化时,就只记录 'meaningless_task' 这个干净的名字。子进程收到后,直接 from __main__ import meaningless_task ,完美加载——因为 meaningless_task 这个名字,在子进程启动时,已经被 initializer spawn 机制重新执行了一遍装饰逻辑,它指向的正是那个 wrapper

注意: @wraps 不改变函数的行为,只改变它的“身份证信息”。 wrapper 的代码体、闭包变量(如 func 引用)依然存在,只是对外宣称自己叫 meaningless_task

3. 实操过程与核心环节实现:从报错到稳定运行的完整链路

3.1 复现原始报错:精准定位故障点

不要跳过这一步。亲手触发错误,是理解问题的开始。新建文件 debug_pickle.py ,粘贴原始代码(去掉 heartrate 相关部分,聚焦核心):

import os
from multiprocessing import Pool
import time
from functools import wraps

# 1. 定义一个“有问题”的装饰器(故意不加 @wraps)
def track_execution_time_broken(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        print(f"[{os.getpid()}] Starting task at {start_time:.2f}")
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"[{os.getpid()}] Ending task at {end_time:.2f}, duration: {end_time - start_time:.2f}s")
        return result
    return wrapper  # ← 没有 @wraps,wrapper.__qualname__ 是非法的

# 2. 用这个装饰器装饰工作函数
@track_execution_time_broken
def meaningless_task(dummy_text_partial):
    words = dummy_text_partial.split()
    lorem_count = sum(1 for word in words if word.lower() == "lorem")
    for i in range(2):  # 缩短等待时间,便于调试
        time.sleep(0.5)
    return lorem_count

# 3. 主函数:启动 Pool
def main():
    dummy_text = "Lorem ipsum dolor sit amet, consectetur Lorem adipiscing Lorem elit"
    try:
        with Pool(processes=2) as pool:
            results = pool.map(meaningless_task, dummy_text.split(","))
            print("Word count results:", results)
    except Exception as e:
        print("捕获到异常:", type(e).__name__, str(e))
        # 手动打印关键属性,确认问题
        print("meaningless_task.__qualname__ =", meaningless_task.__qualname__)
        print("meaningless_task.__module__ =", meaningless_task.__module__)

if __name__ == "__main__":
    main()

运行 python debug_pickle.py ,你会看到:

捕获到异常: AttributeError Can't pickle local object 'track_execution_time_broken.<locals>.wrapper'
meaningless_task.__qualname__ = track_execution_time_broken.<locals>.wrapper
meaningless_task.__module__ = __main__

完美复现! __qualname__ 的非法路径暴露无遗。这是你的“案发现场”。

3.2 修复方案一: @wraps(func) —— 标准且推荐

修改装饰器,加上 @wraps(func)

from functools import wraps  # 确保已导入

def track_execution_time_fixed(func):
    @wraps(func)  # ← 加上这一行!
    def wrapper(*args, **kwargs):
        start_time = time.time()
        print(f"[{os.getpid()}] Starting task at {start_time:.2f}")
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"[{os.getpid()}] Ending task at {end_time:.2f}, duration: {end_time - start_time:.2f}s")
        return result
    return wrapper

@track_execution_time_fixed
def meaningless_task(dummy_text_partial):
    # ... 同上 ...

再次运行,报错消失,输出正常:

[12345] Starting task at 1712345678.12
[12345] Ending task at 1712345679.12, duration: 1.00s
[12346] Starting task at 1712345678.13
[12346] Ending task at 1712345679.13, duration: 1.00s
Word count results: [1, 2]

此时检查 meaningless_task.__qualname__ ,输出是 meaningless_task ,合法。

3.3 修复方案二:手动赋值 __qualname__ —— 理解原理的捷径

如果你想彻底搞懂 @wraps 在做什么,可以不用它,手动设置:

def track_execution_time_manual(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        print(f"[{os.getpid()}] Starting task at {start_time:.2f}")
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"[{os.getpid()}] Ending task at {end_time:.2f}, duration: {end_time - start_time:.2f}s")
        return result
    # 手动复制关键属性(最小化,只做 Pickle 必需的)
    wrapper.__module__ = func.__module__
    wrapper.__name__ = func.__name__
    wrapper.__qualname__ = func.__qualname__  # ← Pickle 最需要的
    # wrapper.__doc__ = func.__doc__  # 可选,如果需要文档
    return wrapper

效果完全一样。这证明了 @wraps 的核心价值就是 __qualname__ 的重写。其他属性(如 __doc__ )是锦上添花,让装饰后的函数在 IDE 提示、 help() 中表现得更“原生”。

3.4 集成 heartrate :解决端口冲突的实战技巧

现在回到原始需求:为每个子进程启动独立的 heartrate 服务。 heartrate 需要唯一端口,而 initializer 函数在每个子进程中都会执行一次。原始代码用 os.getpid() 计算端口是合理的,但要注意两点:

  1. 端口范围安全 port_base + process_id % 10000 可能产生小于 1024 的端口(需要 root 权限)或大于 65535 的端口(无效)。应做校验:
def initialize_worker():
    process_id = os.getpid()
    # 生成 8000-8999 范围内的端口,安全且易记
    port = 8000 + (process_id % 1000)
    print(f"Tracing on port {port} for process {process_id}")
    heartrate.trace(browser=True, port=port)
  1. heartrate 初始化时机 heartrate.trace() 必须在子进程的 initializer 中调用,不能在主进程调用。否则所有子进程会尝试连接同一个端口,导致 Address already in use 错误。原始代码的 initializer=initialize_worker 是正确的。

完整可运行的 heartrate 版本( run_with_heartrate.py ):

import os
from multiprocessing import Pool
import time
from functools import wraps
import heartrate

# 端口生成策略:基于 PID,确保唯一且安全
PORT_BASE = 8000
PORT_RANGE = 1000

def initialize_worker():
    process_id = os.getpid()
    port = PORT_BASE + (process_id % PORT_RANGE)
    print(f"[{process_id}] Initializing heartrate on port {port}")
    # browser=True 会自动打开浏览器,若不需要可设为 False
    heartrate.trace(browser=False, port=port)

def track_execution_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        print(f"[{os.getpid()}] Starting task at {start_time:.2f}")
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"[{os.getpid()}] Ending task at {end_time:.2f}, duration: {end_time - start_time:.2f}s")
        return result
    return wrapper

@track_execution_time
def meaningless_task(dummy_text_partial):
    words = dummy_text_partial.split()
    lorem_count = sum(1 for word in words if word.lower() == "lorem")
    for i in range(5):
        time.sleep(1)  # 每次循环睡1秒,便于在 heartrate 界面观察
    return lorem_count

def main():
    dummy_text = "Lorem ipsum dolor sit amet, consectetur Lorem adipiscing Lorem elit"
    # 使用 spawn 方法(Windows/macOS 默认,Linux 也可显式指定)
    # multiprocessing.set_start_method("spawn")  # 如需显式设置
    with Pool(processes=2, initializer=initialize_worker) as pool:
        results = pool.map(meaningless_task, dummy_text.split(","))
        print("Word count results:", results)

if __name__ == "__main__":
    main()

运行后,你会看到两个终端日志(每个进程一个),并自动打开两个浏览器窗口(端口 8000 和 8001),实时显示 time.sleep(1) 的执行次数。这就是 heartrate 的价值:可视化验证任务是否真的在多个进程里并行执行,而不是串行。

实操心得: heartrate browser=False 参数很关键。在 CI/CD 或无图形界面服务器上,设为 False 可避免因无法打开浏览器导致的阻塞或报错。日志中会显示 http://localhost:8000 这样的地址,你可以手动在任意浏览器中访问。

4. 常见问题与排查技巧实录:那些让你抓狂的“玄学”问题

4.1 问题速查表:典型报错与根因分析

报错信息 根本原因 排查步骤 解决方案
AttributeError: Can't pickle local object 'xxx.<locals>.yyy' 装饰器 wrapper __qualname__ 非法 1. print(func.__qualname__)
2. 检查装饰器是否漏了 @wraps(func)
@wraps(func) 或手动赋值 __qualname__
AttributeError: Can't get attribute 'xxx' on <module '__main__'> 子进程找不到函数定义(模块未 import 或名字不对) 1. 确认函数在 if __name__ == "__main__": 之外定义
2. print(__name__) 看主模块名
将函数定义移到模块顶层;避免在 if __name__ == "__main__" 内定义函数
OSError: [Errno 98] Address already in use heartrate 端口被占用(多个进程抢同一端口) 1. lsof -i :8000 (macOS/Linux) 或 netstat -ano | findstr :8000 (Windows)
2. 检查 initialize_worker 是否在主进程也被调用
确保 initializer 只在子进程执行;用 os.getpid() 生成唯一端口;重启终端杀掉残留进程
KeyboardInterrupt 不生效(Ctrl+C 无反应) heartrate trace() 在后台线程阻塞了信号 1. 观察进程是否卡在 time.sleep heartrate 调用
2. 尝试 kill -9 <pid>
1. 降低 range() 循环次数(如 range(5)
2. 用 pool.terminate() + pool.join() except KeyboardInterrupt 中优雅退出

4.2 深度排查: fork vs spawn 的陷阱

Python 多进程的启动方法 ( start method ) 是隐形杀手。不同系统默认不同,行为差异巨大:

  • fork (Linux/WSL 默认,Python < 3.14) :子进程是父进程的“内存快照”。 heartrate.trace() 在父进程调用一次,子进程会继承其状态,导致端口冲突。 @wraps 问题在此模式下可能不暴露(因为函数对象已存在),但 heartrate 会崩。
  • spawn (Windows/macOS 默认,Python >= 3.14 Linux 默认) :子进程是全新 Python 解释器,从头 exec 主脚本。 @wraps 问题必然暴露,但 heartrate 端口管理更可控。

如何检查和设置?

import multiprocessing

# 查看当前方法
print("Current start method:", multiprocessing.get_start_method())

# 显式设置为 spawn(推荐,行为一致)
try:
    multiprocessing.set_start_method("spawn")
except RuntimeError:
    # 如果已被设置,忽略
    pass

实操验证 :在 main() 开头加入上述代码,并在 initialize_worker 中打印 os.getpid() __name__

def initialize_worker():
    print(f"[{os.getpid()}] __name__ is {__name__}")  # spawn 下为 '__main__',fork 下可能为 'module_name'
    # ... 其余代码

如果看到子进程的 __name__ '__mp_main__' 或类似,说明是 spawn ;如果是 '__main__' ,则可能是 fork 。统一用 spawn 可避免大部分兼容性问题。

4.3 终极避坑:装饰器 + 类 + 多进程的“死亡组合”

原始文章提到“类装饰器更复杂”,这不是危言耸听。看这个例子:

class Timer:
    def __init__(self, name="Task"):
        self.name = name

    def __call__(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start = time.time()
            result = func(*args, **kwargs)
            end = time.time()
            print(f"{self.name} took {end - start:.2f}s")
            return result
        return wrapper

# ❌ 危险!Timer 是一个类实例,其 __call__ 返回的 wrapper 仍依赖于 self
@Timer("MyTask")
def risky_task():
    time.sleep(1)

问题在哪? @Timer("MyTask") 创建了一个 Timer 实例, __call__ 方法返回 wrapper 。但 wrapper 的闭包中,包含了对 self (即 Timer 实例)的引用。 self 是一个普通对象, Timer 类本身必须能被 Pickle。如果 Timer 类定义在 if __name__ == "__main__": 里,或者有不可序列化的属性(如文件句柄、数据库连接), wrapper 就无法被 Pickle。

安全做法

  • 将装饰器类定义在模块顶层(不在 if __name__ == "__main__": 内)。
  • 确保装饰器类的所有属性都是可序列化的(基本类型、函数、其他可序列化类)。
  • 或者,放弃类装饰器,用函数装饰器 + functools.partial 模拟参数化:
from functools import partial

def timer_decorator(name="Task"):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start = time.time()
            result = func(*args, **kwargs)
            end = time.time()
            print(f"{name} took {end - start:.2f}s")
            return result
        return wrapper
    return decorator

# ✅ 安全:timer_decorator("MyTask") 返回的是一个函数,不携带外部状态
@timer_decorator("MyTask")
def safe_task():
    time.sleep(1)

4.4 生产环境加固:超时与错误传播

Pool.map() 在生产环境中必须加超时,否则一个卡死的子进程会让整个程序挂起:

def main():
    dummy_text = "Lorem ipsum dolor sit amet, consectetur Lorem adipiscing Lorem elit"
    with Pool(processes=2, initializer=initialize_worker) as pool:
        try:
            # 设置超时为 10 秒
            results = pool.map(meaningless_task, dummy_text.split(","), timeout=10)
            print("Word count results:", results)
        except multiprocessing.TimeoutError:
            print("任务执行超时!正在终止池...")
            pool.terminate()  # 立即杀死所有子进程
            pool.join()       # 等待它们结束
            raise
        except Exception as e:
            print("发生未预期错误:", e)
            pool.close()
            pool.join()
            raise

同时, multiprocessing 的错误默认不会在主进程抛出,而是静默失败。用 pool.map_async() 可以捕获子进程的异常:

def main_async():
    dummy_text = "Lorem ipsum dolor sit amet, consectetur Lorem adipiscing Lorem elit"
    with Pool(processes=2, initializer=initialize_worker) as pool:
        # map_async 返回 AsyncResult 对象
        async_result = pool.map_async(meaningless_task, dummy_text.split(","))
        try:
            results = async_result.get(timeout=10)  # 等待结果,带超时
            print("Word count results:", results)
        except multiprocessing.TimeoutError:
            print("超时")
        except Exception as e:
            # 子进程中的异常会在这里抛出
            print("子进程报错:", e)

实操心得:我在一个日处理百万级日志的项目中,就因为没加 timeout ,一个网络请求卡死导致整条流水线停摆 2 小时。从此所有 Pool 调用都强制加 timeout ,并配合 async_result.get() 捕获真实错误。这是血的教训。

5. 工具与生态:Beyond heartrate 的现代替代方案

heartrate 是一个绝妙的教学工具,它用最直观的方式展示了“代码执行流”。但作为生产环境的监控方案,它有明显局限:仅支持单机、无持久化、UI 简单。了解它的原理后,你应该知道下一步该用什么。

5.1 生产级替代: psutil + logging 的轻量组合

对于大多数需要“知道任务在哪、跑了多久、有没有卡住”的场景, psutil (获取进程信息)和标准 logging (结构化日志)的组合,比 heartrate 更健壮:

import logging
import psutil
from datetime import datetime

# 配置 JSON 日志(便于 ELK 收集)
logging.basicConfig(
    level=logging.INFO,
    format='{"timestamp": "%(asctime)s", "process": "%(process)d", "message": "%(message)s"}',
    handlers=[logging.StreamHandler()]
)

def track_with_psutil(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        proc = psutil.Process()
        cpu_start = proc.cpu_percent()
        mem_start = proc.memory_info().rss / 1024 / 1024  # MB

        logging.info(f"STARTED: {func.__name__} with args={args}")

        result = func(*args, **kwargs)

        end_time = time.time()
        cpu_end = proc.cpu_percent()
        mem_end = proc.memory_info().rss / 1024 / 1024

        duration = end_time - start_time
        logging.info(
            f"FINISHED: {func.__name__}, "
            f"duration={duration:.2f}s, "
            f"cpu_delta={cpu_end-cpu_start:.2f}%, "
            f"mem_delta={mem_end-mem_start:.2f}MB"
        )
        return result
    return wrapper

优势:零外部依赖、日志可集中收集、CPU/内存指标一目了然、完全兼容 multiprocessing

5.2 可视化升级: Py-Spy —— 无需修改代码的火焰图

heartrate 需要你在代码里加 trace() 。而 Py-Spy 是一个外部工具,它通过读取 Python 进程的内存,实时生成火焰图,完全无需修改你的代码:

# 安装
pip install py-spy

# 启动你的多进程脚本(假设 PID 是 12345)
python your_script.py &

# 用 py-spy 附加到进程,生成火焰图
py-spy record -p 12345 -o profile.svg --duration 30

# 或者实时查看
py-spy top -p 12345

Py-Spy 会显示每个进程里,哪个函数占用了多少 CPU 时间,精确到毫秒。它解决了 heartrate 的最大短板: heartrate 只告诉你“执行了多少次”, Py-Spy 告诉你“哪一行代码最慢”。对于性能调优,这是降维打击。

5.3 分布式扩展: Dask —— 当 multiprocessing 不够用时

当你的任务规模超出单机,或者需要更复杂的任务调度(如依赖关系、优先级、容错), multiprocessing 就显得力不从心。 Dask 是一个成熟的分布式计算框架,它提供了 dask.distributed.Client ,可以无缝地将 Pool.map() 替换为集群计算:

from dask.distributed import Client

# 启动本地集群(或连接远程集群)
client = Client(n_workers=2, threads_per_worker=1)

# 你的函数无需任何修改
future = client.map(meaningless_task, dummy_text.split(","))
results = client.gather(future)  # 等待所有结果
print("Dask results:", results)

Dask 内部也使用 Pickle,但它对装饰器的兼容性更好(因为它有自己的序列化层),并且提供了 Web UI( client.dashboard_link )实时监控每个 worker 的负载、内存、执行历史,功能远超 heartrate

个人体会:我最初痴迷于 heartrate 的酷炫 UI,但在一个需要处理 TB 级数据的项目中,最终切换到了 Dask + Py-Spy 的组合。 heartrate 教会我理解 __qualname__ ,而 Dask 让我明白,真正的工程化,是选择合适的工具链,而不是在单一工具上钻牛角尖。当你能用 dask.distributed.Client 一行代码替换掉整个 multiprocessing 模块,并获得集群能力时,那种“升维”的感觉,比看一百个 heartrate 的进度条都爽。

更多推荐