PyTorch 线程与 vLLM 进程线程的区别

简要回答

torch.num_threads 控制的是 PyTorch 内部计算线程,主要用于 CPU 上的矩阵运算等操作;而 vLLM 进程中的其他线程包括 OpenMP 线程、worker 进程、通信线程等,用于不同的并行任务。vLLM 通过限制 PyTorch 线程数来避免与这些其他线程产生 CPU 资源争用 1

详细说明

1. PyTorch 内部线程(torch.num_threads)

# 获取当前 PyTorch 线程数
current_parallelism = torch.get_num_threads()
# 设置 PyTorch 线程数
torch.set_num_threads(default_omp_num_threads)

这些线程主要用于:

  • CPU 矩阵运算:线性层、注意力计算等
  • 张量操作:reshape、transpose 等
  • BLAS 后端:调用 MKL、OpenBLAS 等库 1

2. vLLM 进程中的其他线程

OpenMP 线程
# CPU worker 中的 OpenMP 线程绑定
ret = torch.ops._C.init_cpu_threads_env(self.local_omp_cpuid)

用于:

  • 并行计算:oneDNN 操作、自定义内核
  • NUMA 绑定:将线程绑定到特定 CPU 核心 2
Worker 进程线程
# 多进程执行器创建的 worker 进程
class WorkerProc:
    def worker_busy_loop(self):
        # 主工作循环线程

用于:

  • 模型执行:GPU/CPU 前向传播
  • 通信协调:进程间数据传输 3
通信线程
# ZMQ 通信线程
async def async_output_busy_loop(self):
    # 异步输出复制线程

用于:

  • 网络通信:API 请求处理
  • 数据流传输:流式输出 4

3. 线程管理策略

vLLM 采用分层线程管理:

线程级

进程级

API Server 进程

Engine Core 进程

GPU Worker 进程

PyTorch 计算线程

OpenMP 并行线程

通信线程

异步任务线程

4. 资源隔离机制

vLLM 通过以下方式避免线程争用:

  1. 限制 PyTorch 线程数:默认设为 1 5
  2. CPU 亲和性绑定:将 OpenMP 线程绑定到特定核心 6
  3. 进程隔离:不同功能运行在独立进程中 7

Notes

  • PyTorch 线程主要用于计算密集型任务
  • vLLM 的其他线程处理 I/O、通信、调度等任务
  • 合理的线程配置对性能至关重要,特别是在容器化环境中
  • 用户可通过 OMP_NUM_THREADS 环境变量自定义线程行为

Wiki pages you might want to explore:

Wiki pages you might want to explore:

Citations

File: vllm/v1/executor/multiproc_executor.py (L618-624)

            self.async_output_queue: queue.Queue = queue.Queue()
            self.async_output_copy_thread = Thread(
                target=self.async_output_busy_loop,
                daemon=True,
                name="WorkerAsyncOutputCopy",
            )
            self.async_output_copy_thread.start()

File: vllm/v1/executor/multiproc_executor.py (L931-937)

    def worker_busy_loop(self):
        """Main busy loop for Multiprocessing Workers"""
        assert self.rpc_broadcast_mq is not None
        while True:
            method, args, kwargs, output_rank = self.rpc_broadcast_mq.dequeue(
                indefinite=True
            )

File: vllm/v1/executor/multiproc_executor.py (L1003-1023)

    # Configure thread parallelism if OMP_NUM_THREADS isn't set
    #
    # Helps to avoid CPU contention. The default of spawning a thread per
    # core combined with multiprocessing for each GPU can have a negative
    # impact on performance. The contention is amplified when running in a
    # container where CPU limits can cause throttling.
    default_omp_num_threads = 1
    if (
        "OMP_NUM_THREADS" not in os.environ
        and (current_parallelism := torch.get_num_threads()) > default_omp_num_threads
    ):
        logger.warning_once(
            "Reducing Torch parallelism from %d threads to %d to avoid "
            "unnecessary CPU contention. Set OMP_NUM_THREADS in the "
            "external environment to tune this value as needed.",
            current_parallelism,
            default_omp_num_threads,
            scope="local",
        )
        os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads)
        torch.set_num_threads(default_omp_num_threads)

File: vllm/v1/worker/cpu_worker.py (L106-109)

        if self.local_omp_cpuid != "nobind":
            ret = torch.ops._C.init_cpu_threads_env(self.local_omp_cpuid)
            if ret:
                logger.info(ret)

File: csrc/cpu/utils.cpp (L135-149)

  #pragma omp parallel for schedule(static, 1)
  for (size_t i = 0; i < omp_cpu_ids.size(); ++i) {
    cpu_set_t mask;
    CPU_ZERO(&mask);
    CPU_SET(omp_cpu_ids[i], &mask);
    int ret = sched_setaffinity(0, sizeof(cpu_set_t), &mask);
    if (ret == -1) {
      TORCH_CHECK(false,
                  "sched_setaffinity failed. errno: " + std::to_string(errno));
    }

    omp_set_lock(&writelock);
    thread_core_mapping.emplace_back(gettid(), omp_cpu_ids[i]);
    omp_unset_lock(&writelock);
  }

File: docs/design/arch_overview.md (L81-127)

## V1 Process Architecture

vLLM V1 uses a multi-process architecture to separate concerns and maximize throughput. Understanding this architecture is important for properly sizing CPU resources in your deployment. The key processes are:

### API Server Process

The API server process handles HTTP requests (e.g., the OpenAI-compatible API), performs input processing (tokenization, multi-modal data loading), and streams results back to clients. It communicates with the engine core process(es) via ZMQ sockets.

By default, there is **1 API server process**, but when data parallelism is used, the API server count automatically scales to match the data parallel size. This can also be manually configured with the `--api-server-count` flag. Each API server connects to **all** engine cores via ZMQ in a many-to-many topology, enabling any API server to route requests to any engine core. Each API server process uses multiple CPU threads for media loading (controlled by `VLLM_MEDIA_LOADING_THREAD_COUNT`, default 8).

The code can be found in [vllm/entrypoints/openai/api_server.py](../../vllm/entrypoints/openai/api_server.py) and [vllm/v1/utils.py](../../vllm/v1/utils.py).

### Engine Core Process

The engine core process runs the scheduler, manages KV cache, and coordinates model execution across GPU workers. It runs a busy loop that continuously schedules requests and dispatches work to the GPU workers.

There is **1 engine core process per data parallel rank**. For example, with `--data-parallel-size 4`, there are 4 engine core processes.

The code can be found in [vllm/v1/engine/core.py](../../vllm/v1/engine/core.py) and [vllm/v1/engine/utils.py](../../vllm/v1/engine/utils.py).

### GPU Worker Processes

Each GPU is managed by a dedicated worker process. The worker process loads model weights, executes forward passes, and manages GPU memory. Workers communicate with the engine core process that owns them.

There is **1 worker process per GPU**. The total number of GPU worker processes equals `tensor_parallel_size x pipeline_parallel_size` per engine core.

The code can be found in [vllm/v1/executor/multiproc_executor.py](../../vllm/v1/executor/multiproc_executor.py) and [vllm/v1/worker/gpu_worker.py](../../vllm/v1/worker/gpu_worker.py).

### DP Coordinator Process (conditional)

When using data parallelism (`--data-parallel-size > 1`), an additional coordinator process manages load balancing across DP ranks and coordinates synchronized forward passes for MoE models.

There is **1 DP coordinator process** (only when data parallelism is enabled).

The code can be found in [vllm/v1/engine/coordinator.py](../../vllm/v1/engine/coordinator.py).

### Process Count Summary

For a deployment with `N` GPUs, `TP` tensor parallel size, `DP` data parallel size, and `A` API server count:

| Process Type | Count | Notes |
| - | - | - |
| API Server | `A` (default `DP`) | Handles HTTP requests and input processing |
| Engine Core | `DP` (default 1) | Scheduler and KV cache management |
| GPU Worker | `N` (= `DP x PP x TP`) | One per GPU, executes model forward passes |
| DP Coordinator | 1 if `DP > 1`, else 0 | Load balancing across DP ranks |
| **Total** | **`A + DP + N` (+ 1 if DP > 1)** | |

这段代码主要在做两件事:

  1. 判断当前容器/系统实际上能用多少 CPU 核
  2. 在 vLLM 启动多进程 worker 之前,设置合适的多进程和线程环境变量,避免 CPU 线程打架、性能变差

你可以把它理解成:

在启动 worker 进程前,先把“进程怎么起、每个进程开多少 CPU 线程”这两件事调整好。


一、前半部分:读取 cgroups 里的 CPU 限额

这部分代码的目的:

如果程序跑在 Linux 容器里(Docker / K8s 很常见),系统表面上可能看到很多 CPU,但容器实际上只被分配了少量 CPU 配额。代码要识别这个“真实可用 CPU 数”。


1. get_cpu_cores_via_cgroups()

def get_cpu_cores_via_cgroups() -> typing.Optional[int]:
    # cgroups V1
    cfs_period_path = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"
    cfs_quota_path = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"
    # cgroups V2
    cpu_max_path = "/sys/fs/cgroup/cpu.max"
    cpu_cores = _get_cpu_cores(cfs_period_path, cfs_quota_path, cpu_max_path)
    return cpu_cores

作用:

  • 检查 Linux cgroups 配置
  • 支持:
    • cgroups v1
    • cgroups v2
  • 返回当前容器大约被限制为多少个 CPU 核可用

返回值可能是:

  • 一个整数,比如 24
  • None,表示没读到限制信息

2. _get_cpu_cores(...)

def _get_cpu_cores(
    cfs_period_path: str, cfs_quota_path: str, cpu_max_path: str
) -> typing.Optional[int]:

这是实际干活的函数。


2.1 如果是 cgroups v1

cfs_period = Path(cfs_period_path)
cfs_quota = Path(cfs_quota_path)
if cfs_period.exists() and cfs_quota.exists():

它先检查这两个文件存不存在:

  • cpu.cfs_period_us
  • cpu.cfs_quota_us

这是 cgroups v1 里 CPU 限额的配置文件。


2.2 读取 period 和 quota

with (
    cfs_period.open("rb") as fp_p,
    cfs_quota.open("rb") as fp_q,
):
    p, q = int(fp_p.read()), int(fp_q.read())
  • p = 一个周期长度(微秒)
  • q = 每个周期允许使用的总 CPU 时间(微秒)

比如:

  • p = 100000
  • q = 200000

意思大概是:

每 100000 微秒的周期里,允许总共跑 200000 微秒 CPU 时间

这相当于 2 个 CPU 核 的配额。


2.3 计算“相当于多少核”

cpu_cores = math.ceil(q / p) if q > 0 and p > 0 else None

公式:

cpu核数 = quota / period

并向上取整。

例如:

  • 200000 / 100000 = 2 → 2 核
  • 150000 / 100000 = 1.5ceil 后得到 2 核

这里取整是偏保守/方便配置。


2.4 如果不是 cgroups v1,就看 v2

else:
    cpu_max = Path(cpu_max_path)
    if cpu_max.exists():
        line = cpu_max.read_text()
        cpu_cores = cpu_cores_from_cgroups_v2_cpu_max(line)

如果前面的 v1 文件不存在,就尝试读取 cgroups v2 的:

/sys/fs/cgroup/cpu.max

3. cpu_cores_from_cgroups_v2_cpu_max(line)

def cpu_cores_from_cgroups_v2_cpu_max(line: str) -> typing.Optional[int]:

这个函数专门解析 cgroups v2 的 cpu.max 内容。


3.1 cpu.max 的格式

通常像这样:

200000 100000

或者:

max 100000

3.2 如果是 max

if parts[0] == "max":
    return multiprocessing.cpu_count()

意思是:

  • 没有限制 CPU 配额
  • 就直接返回机器上检测到的 CPU 总数

3.3 如果有限额

q = int(parts[0])
p = int(parts[1])
cpu_cores = math.ceil(q / p) if q > 0 and p > 0 else None

跟 v1 一样,用:

quota / period

来换算成等效 CPU 核数。


二、后半部分:设置多进程 worker 的环境变量

这一段是核心:

def set_multiprocessing_worker_envs(parallel_config: ParallelConfig):

这个函数的作用是:

在创建 vLLM worker 进程之前,先设置好多进程启动方式和线程数,避免资源争抢。

注释里也写了:

This should be called by the parent process before worker processes are created

意思是:

  • 父进程先调用这个函数
  • 然后再创建 worker 子进程

1. 先检查是否必须用 spawn

_maybe_force_spawn()

这里就是你前面问过的那个函数。

它会检查:

  • 是否在 Ray actor 里
  • CUDA / XPU 是否已经初始化

如果是,就强制:

VLLM_WORKER_MULTIPROC_METHOD=spawn

目的:

  • 避免 fork 导致 CUDA / Ray 问题

2. 注释解释:为什么要调线程数

# Configure thread parallelism if OMP_NUM_THREADS isn't set
#
# Helps to avoid CPU contention. The default of spawning a thread per
# core combined with multiprocessing for each GPU can have a negative
# impact on performance. The contention is amplified when running in a
# container where CPU limits can cause throttling.

翻译一下就是:

如果用户没有手动设置 OMP_NUM_THREADS,那就自动配置线程并行度。
这是为了避免 CPU 争抢。
默认情况下,一个进程可能会为每个 CPU core 都开线程;如果再叠加“每个 GPU 一个 worker 进程”,线程总数会爆炸,性能反而下降。
在容器里如果 CPU 还被限额,这个问题会更严重,因为会被 throttling(限速)。


3. 当前 Torch 默认线程数

num_threads = torch.get_num_threads()

意思是:

  • 读取 PyTorch 当前准备使用的 CPU 线程数

比如可能是:

  • 16
  • 32
  • 64

通常默认会比较大,接近机器 CPU 核数。


4. 读取容器 CPU 限额

cgroup_cpu_cores = get_cpu_cores_via_cgroups()

这一步就是前面那套逻辑:

  • 如果跑在容器里,看看实际分到了多少 CPU
  • 例如宿主机有 64 核,但容器只给了 8 核

5. 用 cgroup 限额修正线程数

if isinstance(cgroup_cpu_cores, int):
    num_threads = max(1, min(num_threads, cgroup_cpu_cores))
else:
    num_threads = max(1, num_threads)

意思是:

  • 如果拿到了容器可用 CPU 数:
    • num_threads = min(torch默认线程数, 容器CPU核数)
  • 至少保留 1 个线程

举例:

  • torch 默认 32 线程
  • 容器只给了 6 核
  • 那最终先把 num_threads 压到 6

6. 根据 world_size 计算每个 worker 应该分多少线程

default_omp_num_threads = max(
    1, num_threads // max(1, parallel_config.world_size))

这一步很关键。

parallel_config.world_size 通常表示:

总共会有多少个并行 worker / rank / 进程参与执行

于是代码的想法是:

如果总共可用 CPU 线程数是 num_threads,而你要启动 world_size 个 worker,那每个 worker 平均分一点,不要每个人都把所有 CPU 吃满。

公式就是:

每个 worker 的 OMP_NUM_THREADS = num_threads // world_size

至少为 1。


例子

假设:

  • 容器可用 CPU = 8 核
  • world_size = 4

那么:

default_omp_num_threads = 8 // 4 = 2

意思是:

  • 一共 4 个 worker
  • 每个 worker 最多用 2 个 OMP 线程
  • 总体比较平衡

7. 只有在用户没手动设置 OMP_NUM_THREADS 时才自动干预

if (
    "OMP_NUM_THREADS" not in os.environ
    and (current_parallelism := torch.get_num_threads()) > default_omp_num_threads
):

条件有两个:

条件 1

用户没有手动设置:

OMP_NUM_THREADS

也就是说:

  • 如果用户自己已经明确配置了线程数,代码就尊重用户,不乱改

条件 2

当前 PyTorch 线程数比建议值大

如果已经不大了,就没必要再调小。


8. 打印警告

logger.warning(
    "Reducing Torch parallelism from %d threads to %d to avoid "
    "unnecessary CPU contention. Set OMP_NUM_THREADS in the "
    "external environment to tune this value as needed.",
    current_parallelism,
    default_omp_num_threads,
)

意思是:

正在把 Torch 的线程并行度从 X 降到 Y,避免没必要的 CPU 争抢。
如果你想自己调,建议在外部环境变量里设置 OMP_NUM_THREADS


9. 真正设置环境变量和 Torch 线程数

os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads)
torch.set_num_threads(default_omp_num_threads)

这一步做了两件事:

设置环境变量

OMP_NUM_THREADS=...

这个通常影响 OpenMP 线程池行为。

设置 PyTorch 当前线程数

torch.set_num_threads(...)

让当前进程里 PyTorch 的 CPU 并行线程数也同步生效。


三、为什么要这样做?

简单说,就是防止:

多进程 × 多线程 = 线程数爆炸


一个典型问题场景

假设:

  • 机器有 32 个 CPU 核
  • 你开 8 个 worker 进程(比如 8 张 GPU)
  • 每个 worker 默认都开 32 个 CPU 线程

那总线程数就可能变成:

8 × 32 = 256

结果:

  • 大量线程争抢 CPU
  • 频繁上下文切换
  • 实际吞吐下降
  • 延迟变差
  • 容器里更容易因为 CPU quota 被 throttle

也就是说:

线程不是越多越快,乱开线程常常更慢。


四、这段代码整体在做什么

可以总结成下面这个流程:


第一步:检查多进程启动方式

_maybe_force_spawn()

如果 CUDA/Ray 场景不安全,就强制 worker 用 spawn


第二步:估算真实 CPU 资源

get_cpu_cores_via_cgroups()

看容器实际分到了多少 CPU,不要只看宿主机总核数。


第三步:按 worker 数量平分 CPU 线程

default_omp_num_threads = num_threads // world_size

避免每个 worker 都把所有 CPU 线程占满。


第四步:如果用户没配,就自动把线程数降下来

OMP_NUM_THREADS
torch.set_num_threads(...)

减少 CPU contention(争抢)。


五、一句话总结

这段代码是在:

vLLM 启动多进程 worker 前,根据 CUDA/Ray 环境决定是否强制用 spawn,再根据容器 CPU 配额和并行 worker 数量,自动把每个进程的 CPU 线程数降到合理范围,避免多进程下线程打架导致性能下降。


如果你愿意,我还能继续帮你讲这几个点里的任意一个:

  1. OMP_NUM_THREADS 到底控制什么
  2. torch.get_num_threads()OMP_NUM_THREADS 的关系
  3. 为什么 world_size 增大时每个 worker 的线程数要减少
  4. 在 Docker / K8s 里这段代码为什么特别重要

OpenMP 线程不是这段代码里直接 Thread(...) 启动的,而是:

当底层 C/C++/Fortran 代码进入一个 OpenMP 并行区域时,由 OpenMP 运行时自动创建/复用线程池。

也就是说,这段代码只是:

  • 设置了环境变量 OMP_NUM_THREADS
  • 调用了 torch.set_num_threads(...)

它们是在告诉底层库“最多用多少线程”,但真正启动线程往往发生在后面执行某些 CPU 算子的时候。


1. 不是 Python 这里手动启动的

你看到的代码里:

os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads)
torch.set_num_threads(default_omp_num_threads)

这里只是在“配置”。

它没有做类似:

threading.Thread(...)

也没有做:

multiprocessing.Process(...)

所以 OpenMP 线程不是在这段 Python 代码里显式创建的。


2. 那到底是谁启动的?

通常是这些底层组件之一:

  • PyTorch 的 C++ 后端
  • MKL
  • oneDNN
  • OpenBLAS(虽然它不一定用 OpenMP)
  • NumPy / SciPy 底层线性代数库
  • 其他调用 OpenMP 的 C/C++ 扩展

比如当你执行:

torch.matmul(a, b)

或者某些 CPU 上的张量操作时,底层 C++ 实现可能会进入 OpenMP 并行区域,类似概念上:

#pragma omp parallel for
for (...) {
    ...
}

当执行到这里时,OpenMP runtime 才会决定:

  • 要不要创建线程池
  • 创建多少线程
  • 是新建还是复用已有线程

3. 具体在 PyTorch 里是什么情况

PyTorch 有两类常见线程并行:

3.1 intra-op 并行

就是:

一个算子内部开多个线程并行

比如一个大矩阵乘法、卷积、张量遍历等。

这通常和:

torch.get_num_threads()
torch.set_num_threads()

关系很大。


3.2 inter-op 并行

就是:

多个独立算子之间并行调度

这个对应的是另外的接口,比如:

torch.set_num_interop_threads(...)

你这里的代码主要调的是intra-op线程数,也就是单个算子内部用多少 CPU 线程。


4. OMP_NUM_THREADS 在什么时候生效

OMP_NUM_THREADS 是 OpenMP 运行时常见的环境变量,表示:

OpenMP 并行区域默认最多使用多少线程

一般它在底层库初始化或第一次进入并行区域时起作用。

也就是说:

  • 你先设置好 OMP_NUM_THREADS
  • 后面某个 PyTorch CPU 算子触发 OpenMP
  • OpenMP runtime 根据这个值决定线程数

5. 一个形象理解

你可以把这段代码理解为:

  • Python 这边只是给厨房下命令:
    “最多安排 4 个厨师干活”
  • 真正什么时候把厨师叫出来,是后厨底层系统决定的
  • 当订单来了(比如矩阵乘法、卷积),后厨才真的启用这些线程

所以:

线程是在底层算子执行时启动的,不是在这段配置代码里启动的。


6. 什么时候会“第一次看到”这些线程

通常是在你执行某些 CPU 密集型操作后,比如:

import torch
torch.set_num_threads(8)

x = torch.randn(5000, 5000)
y = torch.randn(5000, 5000)
z = x @ y

执行矩阵乘法时,底层线程可能才真正被创建。

如果你用 htoptop -Hps -T 去看,往往这时候能看到进程里多出很多线程。


7. 在 vLLM 场景里为什么要提前设

因为 vLLM 会启动多个 worker 进程。

如果不提前限制,每个 worker 后续执行 CPU 相关操作时,底层 OpenMP 可能都想开很多线程。

比如:

  • 4 个 worker
  • 每个 worker 默认开 16 个 OpenMP 线程

那总共就可能有 64 个线程在抢 CPU。

所以这里先设置:

OMP_NUM_THREADS
torch.set_num_threads(...)

是为了让将来底层算子启动 OpenMP 线程时,不要开太多


8. 更准确一点:OpenMP 线程不一定每次都“重新启动”

很多 OpenMP runtime 会维护线程池。

所以更准确地说:

  • 第一次进入并行区域时,可能创建线程
  • 之后再次执行并行区域时,可能复用已有线程
  • 不一定每次计算都重新创建/销毁

因此你也可以理解为:

OMP_NUM_THREADS 控制的是 OpenMP 线程池规模上限,而不是每次都现开多少个 Python 线程。


9. 一句话回答

OpenMP 线程在哪里启动的?

不是在这段 Python 代码里启动的,而是在 PyTorch / MKL / oneDNN 等底层 C/C++ 库执行 CPU 并行算子、进入 OpenMP 并行区域时,由 OpenMP runtime 自动创建或复用的。


如果你愿意,我还可以进一步讲:

  1. torch.set_num_threads()OMP_NUM_THREADS 谁优先
  2. PyTorch 到底是用 OpenMP、MKL 还是 oneDNN
  3. 怎么实际观察一个 Python 进程里 OpenMP 线程是不是被创建了

这段代码主要解决 vLLM 多进程(GPU worker)场景下 CPU 资源分配与线程竞争的问题。核心目标有两个:

  1. 强制使用 spawn 多进程启动方式(通过调用前面定义的 _maybe_force_spawn(),避免 fork 带来的 CUDA/XPU 状态问题)。
  2. 动态调整每个 worker 进程的 OpenMP 线程数OMP_NUM_THREADS),防止在容器或受限环境中因线程过多导致 CPU 过度竞争,影响推理性能。

下面拆解具体逻辑:


一、获取容器分配的 CPU 核心数(cgroups 限制)

get_cpu_cores_via_cgroups() -> int | None
  • 兼容 cgroup v1:读取 /sys/fs/cgroup/cpu/cpu.cfs_period_uscpu.cfs_quota_us,用 ceil(quota / period) 算出可用的 CPU 核心数。
  • 兼容 cgroup v2:读取 /sys/fs/cgroup/cpu.max,解析格式如 "200000 100000""max 100000"。若为 "max" 表示无限制,返回物理 CPU 总数;否则同样用 ceil(quota / period) 计算。

该函数返回容器允许使用的 CPU 核心数(若未限制则返回 None)。


二、多进程 worker 环境配置 set_multiprocessing_worker_envs

输入 parallel_config(包含 world_size,即并行的 GPU worker 数量)。流程如下:

  1. 调用 _maybe_force_spawn()
    确保所有 worker 子进程都使用 spawn 方式启动(前面已解释过其作用)。

  2. 计算每个 worker 合适的 OpenMP 线程数

    • 获取当前 PyTorch 默认线程数:torch.get_num_threads()(通常等于物理 CPU 核心数)。
    • 如果 cgroup 限制了 CPU 核心数 cgroup_cpu_cores,则取 min(默认线程数, cgroup_cpu_cores) 作为可用总线程数;否则用默认值。
    • 将总线程数除以 world_size(并向上取整)得到 每个 worker 应使用的线程数
      default_omp_num_threads = max(1, 总线程数 // world_size)
  3. 调整 OMP_NUM_THREADS
    如果环境变量中没有设置 OMP_NUM_THREADS,且当前 PyTorch 的线程数大于上面计算的合理值,则:

    • 记录警告日志,提示减少线程数以避免 CPU 争用。
    • 设置 os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads)
    • 调用 torch.set_num_threads(default_omp_num_threads) 立即生效。

为什么需要这些操作?

  • 在 GPU 推理中,每个 worker 进程除了使用 GPU,也会在 CPU 上执行一些预处理/后处理或调度逻辑。如果每个 worker 都开启与 CPU 核心数相同的线程(例如 32 核开启 32 线程),而 world_size=8 时,总线程数会暴涨到 256,造成严重的 CPU 上下文切换和争用,反而降低性能。
  • 在容器环境中,用户可能只分配了 4 个 CPU 核心,但 PyTorch 默认仍会读取宿主机的 32 个核心,导致线程被频繁挂起或节流。通过读取 cgroups 可以真实反映可用资源,避免超配。
  • spawn 强制使用:因为在 Ray actor 中或 CUDA 初始化后使用 fork 会导致子进程无法正确继承 GPU 上下文或死锁,必须强制 spawn

总结:这段代码是 vLLM 为多进程 GPU worker 做的资源感知与自适应调优,保证在各类环境(物理机、容器、Ray 集群)下都能避免 CPU 竞争,稳定高效运行。

openmp 线程在哪里启动的?

在这里插入图片描述
OpenMP 线程并不是由 Python 代码显式启动的,而是由底层的 OpenMP 运行时库(如 libgomplibiomp5)在需要时自动创建和管理。下面是具体的启动时机和机制:


1. 线程创建时机:第一次进入并行区域

OpenMP 采用延迟创建策略。线程池通常不是在程序启动时创建,而是在首次执行并行构造(例如 #pragma omp parallel)时才真正启动。此后,线程池会一直存在,供后续并行区域复用。

在 PyTorch 中,常见的会触发 OpenMP 并行计算的算子包括:

  • 矩阵乘法(torch.mm, torch.bmm, torch.matmul
  • 卷积(torch.nn.functional.conv2d
  • 逐元素运算(torch.add 等,当张量足够大时)
  • 归约操作(torch.sum 等)

示例
当你第一次执行一个大的 a @ b 时,PyTorch 底层会调用 OpenMP 版本的矩阵乘法实现,此时 OpenMP 运行时检测到 OMP_NUM_THREADS 的值(或默认值),并创建相应数量的工作线程。


2. torch.set_num_threads 的作用

torch.set_num_threads(n) 会设置 PyTorch 内部线程池的大小。它主要做两件事:

  • 设置 OpenMP 的 num_threads 默认值(通过 omp_set_num_threads)。
  • 如果 PyTorch 编译时使用了 TBB 或 native 线程池,也会调整对应后端。

但注意:调用 set_num_threads 本身并不会立即启动任何线程。它只是修改了一个配置值,真正的线程仍然在第一次并行操作时才创建。


3. 环境变量 OMP_NUM_THREADS 的作用

OMP_NUM_THREADS 是一个标准环境变量,OpenMP 运行时会在程序启动时读取它作为默认线程数。如果代码中没有调用 omp_set_num_threads 覆盖,就使用这个值。

因此,在前面的代码片段中:

os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads)
torch.set_num_threads(default_omp_num_threads)
  • 设置环境变量是为了子进程继承(因为 spawn 方式启动的子进程会重新读取环境变量)。
  • 同时调用 torch.set_num_threads 让当前进程立即生效。

4. 实际线程创建位置(开发者视角)

如果你想在源码级别找到线程创建点,可以关注:

  • OpenMP 库初始化:例如 libgompgomp_team_start 函数会在第一次并行区域被调用时创建线程。
  • PyTorch 的 at::init_num_threads:会调用 at::set_num_threads,但依然不创建线程,只是设置值。

总结

  • 线程启动时机:第一次执行需要 CPU 并行计算的算子(如大矩阵乘)时。
  • 启动位置:在 OpenMP 运行时的 parallel 区域入口,例如 PyTorch 调用的 ATen/Parallel.h 中的 parallel_for
  • 控制方式:通过 OMP_NUM_THREADS 环境变量或 torch.set_num_threads 指定线程数,但不能直接控制线程的启动时刻。

如果你的实际问题是“我想在代码中明确看到线程创建的过程”,那么几乎不可能在纯 Python 层捕捉到——它发生在编译好的二进制库内部,由 OpenMP 运行时透明管理。

CPU worker啥时候启动的,和GPU worker区别?两者会同时启动吗? openmp 线程这个线程会和vllm 中线程竞争用cPU吗

openmp 线程这个线程会和vllm 中线程竞争用cPU吗 vllm 中使用GPU worker时,还要用到openmp 嘛

Logo

免费领 200 小时云算力,进群参与显卡、AI PC 幸运抽奖

更多推荐