【vllm】OMP_NUM_THREADS
PyTorch 线程与 vLLM 进程线程的区别
简要回答
torch.num_threads 控制的是 PyTorch 内部计算线程,主要用于 CPU 上的矩阵运算等操作;而 vLLM 进程中的其他线程包括 OpenMP 线程、worker 进程、通信线程等,用于不同的并行任务。vLLM 通过限制 PyTorch 线程数来避免与这些其他线程产生 CPU 资源争用 1 。
详细说明
1. PyTorch 内部线程(torch.num_threads)
# 获取当前 PyTorch 线程数
current_parallelism = torch.get_num_threads()
# 设置 PyTorch 线程数
torch.set_num_threads(default_omp_num_threads)
这些线程主要用于:
- CPU 矩阵运算:线性层、注意力计算等
- 张量操作:reshape、transpose 等
- BLAS 后端:调用 MKL、OpenBLAS 等库 1
2. vLLM 进程中的其他线程
OpenMP 线程
# CPU worker 中的 OpenMP 线程绑定
ret = torch.ops._C.init_cpu_threads_env(self.local_omp_cpuid)
用于:
- 并行计算:oneDNN 操作、自定义内核
- NUMA 绑定:将线程绑定到特定 CPU 核心 2
Worker 进程线程
# 多进程执行器创建的 worker 进程
class WorkerProc:
def worker_busy_loop(self):
# 主工作循环线程
用于:
- 模型执行:GPU/CPU 前向传播
- 通信协调:进程间数据传输 3
通信线程
# ZMQ 通信线程
async def async_output_busy_loop(self):
# 异步输出复制线程
用于:
- 网络通信:API 请求处理
- 数据流传输:流式输出 4
3. 线程管理策略
vLLM 采用分层线程管理:
4. 资源隔离机制
vLLM 通过以下方式避免线程争用:
Notes
- PyTorch 线程主要用于计算密集型任务
- vLLM 的其他线程处理 I/O、通信、调度等任务
- 合理的线程配置对性能至关重要,特别是在容器化环境中
- 用户可通过
OMP_NUM_THREADS环境变量自定义线程行为
Wiki pages you might want to explore:
- Worker and Executor Architecture (vllm-project/vllm)
- XPU, CPU, and TPU Platforms (vllm-project/vllm)
Wiki pages you might want to explore:
Citations
File: vllm/v1/executor/multiproc_executor.py (L618-624)
self.async_output_queue: queue.Queue = queue.Queue()
self.async_output_copy_thread = Thread(
target=self.async_output_busy_loop,
daemon=True,
name="WorkerAsyncOutputCopy",
)
self.async_output_copy_thread.start()
File: vllm/v1/executor/multiproc_executor.py (L931-937)
def worker_busy_loop(self):
"""Main busy loop for Multiprocessing Workers"""
assert self.rpc_broadcast_mq is not None
while True:
method, args, kwargs, output_rank = self.rpc_broadcast_mq.dequeue(
indefinite=True
)
File: vllm/v1/executor/multiproc_executor.py (L1003-1023)
# Configure thread parallelism if OMP_NUM_THREADS isn't set
#
# Helps to avoid CPU contention. The default of spawning a thread per
# core combined with multiprocessing for each GPU can have a negative
# impact on performance. The contention is amplified when running in a
# container where CPU limits can cause throttling.
default_omp_num_threads = 1
if (
"OMP_NUM_THREADS" not in os.environ
and (current_parallelism := torch.get_num_threads()) > default_omp_num_threads
):
logger.warning_once(
"Reducing Torch parallelism from %d threads to %d to avoid "
"unnecessary CPU contention. Set OMP_NUM_THREADS in the "
"external environment to tune this value as needed.",
current_parallelism,
default_omp_num_threads,
scope="local",
)
os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads)
torch.set_num_threads(default_omp_num_threads)
File: vllm/v1/worker/cpu_worker.py (L106-109)
if self.local_omp_cpuid != "nobind":
ret = torch.ops._C.init_cpu_threads_env(self.local_omp_cpuid)
if ret:
logger.info(ret)
File: csrc/cpu/utils.cpp (L135-149)
#pragma omp parallel for schedule(static, 1)
for (size_t i = 0; i < omp_cpu_ids.size(); ++i) {
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(omp_cpu_ids[i], &mask);
int ret = sched_setaffinity(0, sizeof(cpu_set_t), &mask);
if (ret == -1) {
TORCH_CHECK(false,
"sched_setaffinity failed. errno: " + std::to_string(errno));
}
omp_set_lock(&writelock);
thread_core_mapping.emplace_back(gettid(), omp_cpu_ids[i]);
omp_unset_lock(&writelock);
}
File: docs/design/arch_overview.md (L81-127)
## V1 Process Architecture
vLLM V1 uses a multi-process architecture to separate concerns and maximize throughput. Understanding this architecture is important for properly sizing CPU resources in your deployment. The key processes are:
### API Server Process
The API server process handles HTTP requests (e.g., the OpenAI-compatible API), performs input processing (tokenization, multi-modal data loading), and streams results back to clients. It communicates with the engine core process(es) via ZMQ sockets.
By default, there is **1 API server process**, but when data parallelism is used, the API server count automatically scales to match the data parallel size. This can also be manually configured with the `--api-server-count` flag. Each API server connects to **all** engine cores via ZMQ in a many-to-many topology, enabling any API server to route requests to any engine core. Each API server process uses multiple CPU threads for media loading (controlled by `VLLM_MEDIA_LOADING_THREAD_COUNT`, default 8).
The code can be found in [vllm/entrypoints/openai/api_server.py](../../vllm/entrypoints/openai/api_server.py) and [vllm/v1/utils.py](../../vllm/v1/utils.py).
### Engine Core Process
The engine core process runs the scheduler, manages KV cache, and coordinates model execution across GPU workers. It runs a busy loop that continuously schedules requests and dispatches work to the GPU workers.
There is **1 engine core process per data parallel rank**. For example, with `--data-parallel-size 4`, there are 4 engine core processes.
The code can be found in [vllm/v1/engine/core.py](../../vllm/v1/engine/core.py) and [vllm/v1/engine/utils.py](../../vllm/v1/engine/utils.py).
### GPU Worker Processes
Each GPU is managed by a dedicated worker process. The worker process loads model weights, executes forward passes, and manages GPU memory. Workers communicate with the engine core process that owns them.
There is **1 worker process per GPU**. The total number of GPU worker processes equals `tensor_parallel_size x pipeline_parallel_size` per engine core.
The code can be found in [vllm/v1/executor/multiproc_executor.py](../../vllm/v1/executor/multiproc_executor.py) and [vllm/v1/worker/gpu_worker.py](../../vllm/v1/worker/gpu_worker.py).
### DP Coordinator Process (conditional)
When using data parallelism (`--data-parallel-size > 1`), an additional coordinator process manages load balancing across DP ranks and coordinates synchronized forward passes for MoE models.
There is **1 DP coordinator process** (only when data parallelism is enabled).
The code can be found in [vllm/v1/engine/coordinator.py](../../vllm/v1/engine/coordinator.py).
### Process Count Summary
For a deployment with `N` GPUs, `TP` tensor parallel size, `DP` data parallel size, and `A` API server count:
| Process Type | Count | Notes |
| - | - | - |
| API Server | `A` (default `DP`) | Handles HTTP requests and input processing |
| Engine Core | `DP` (default 1) | Scheduler and KV cache management |
| GPU Worker | `N` (= `DP x PP x TP`) | One per GPU, executes model forward passes |
| DP Coordinator | 1 if `DP > 1`, else 0 | Load balancing across DP ranks |
| **Total** | **`A + DP + N` (+ 1 if DP > 1)** | |
这段代码主要在做两件事:
- 判断当前容器/系统实际上能用多少 CPU 核
- 在 vLLM 启动多进程 worker 之前,设置合适的多进程和线程环境变量,避免 CPU 线程打架、性能变差
你可以把它理解成:
在启动 worker 进程前,先把“进程怎么起、每个进程开多少 CPU 线程”这两件事调整好。
一、前半部分:读取 cgroups 里的 CPU 限额
这部分代码的目的:
如果程序跑在 Linux 容器里(Docker / K8s 很常见),系统表面上可能看到很多 CPU,但容器实际上只被分配了少量 CPU 配额。代码要识别这个“真实可用 CPU 数”。
1. get_cpu_cores_via_cgroups()
def get_cpu_cores_via_cgroups() -> typing.Optional[int]:
# cgroups V1
cfs_period_path = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"
cfs_quota_path = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"
# cgroups V2
cpu_max_path = "/sys/fs/cgroup/cpu.max"
cpu_cores = _get_cpu_cores(cfs_period_path, cfs_quota_path, cpu_max_path)
return cpu_cores
作用:
- 检查 Linux cgroups 配置
- 支持:
- cgroups v1
- cgroups v2
- 返回当前容器大约被限制为多少个 CPU 核可用
返回值可能是:
- 一个整数,比如
2、4 - 或
None,表示没读到限制信息
2. _get_cpu_cores(...)
def _get_cpu_cores(
cfs_period_path: str, cfs_quota_path: str, cpu_max_path: str
) -> typing.Optional[int]:
这是实际干活的函数。
2.1 如果是 cgroups v1
cfs_period = Path(cfs_period_path)
cfs_quota = Path(cfs_quota_path)
if cfs_period.exists() and cfs_quota.exists():
它先检查这两个文件存不存在:
cpu.cfs_period_uscpu.cfs_quota_us
这是 cgroups v1 里 CPU 限额的配置文件。
2.2 读取 period 和 quota
with (
cfs_period.open("rb") as fp_p,
cfs_quota.open("rb") as fp_q,
):
p, q = int(fp_p.read()), int(fp_q.read())
p= 一个周期长度(微秒)q= 每个周期允许使用的总 CPU 时间(微秒)
比如:
p = 100000q = 200000
意思大概是:
每 100000 微秒的周期里,允许总共跑 200000 微秒 CPU 时间
这相当于 2 个 CPU 核 的配额。
2.3 计算“相当于多少核”
cpu_cores = math.ceil(q / p) if q > 0 and p > 0 else None
公式:
cpu核数 = quota / period
并向上取整。
例如:
200000 / 100000 = 2→ 2 核150000 / 100000 = 1.5→ceil后得到 2 核
这里取整是偏保守/方便配置。
2.4 如果不是 cgroups v1,就看 v2
else:
cpu_max = Path(cpu_max_path)
if cpu_max.exists():
line = cpu_max.read_text()
cpu_cores = cpu_cores_from_cgroups_v2_cpu_max(line)
如果前面的 v1 文件不存在,就尝试读取 cgroups v2 的:
/sys/fs/cgroup/cpu.max
3. cpu_cores_from_cgroups_v2_cpu_max(line)
def cpu_cores_from_cgroups_v2_cpu_max(line: str) -> typing.Optional[int]:
这个函数专门解析 cgroups v2 的 cpu.max 内容。
3.1 cpu.max 的格式
通常像这样:
200000 100000
或者:
max 100000
3.2 如果是 max
if parts[0] == "max":
return multiprocessing.cpu_count()
意思是:
- 没有限制 CPU 配额
- 就直接返回机器上检测到的 CPU 总数
3.3 如果有限额
q = int(parts[0])
p = int(parts[1])
cpu_cores = math.ceil(q / p) if q > 0 and p > 0 else None
跟 v1 一样,用:
quota / period
来换算成等效 CPU 核数。
二、后半部分:设置多进程 worker 的环境变量
这一段是核心:
def set_multiprocessing_worker_envs(parallel_config: ParallelConfig):
这个函数的作用是:
在创建 vLLM worker 进程之前,先设置好多进程启动方式和线程数,避免资源争抢。
注释里也写了:
This should be called by the parent process before worker processes are created
意思是:
- 父进程先调用这个函数
- 然后再创建 worker 子进程
1. 先检查是否必须用 spawn
_maybe_force_spawn()
这里就是你前面问过的那个函数。
它会检查:
- 是否在 Ray actor 里
- CUDA / XPU 是否已经初始化
如果是,就强制:
VLLM_WORKER_MULTIPROC_METHOD=spawn
目的:
- 避免 fork 导致 CUDA / Ray 问题
2. 注释解释:为什么要调线程数
# Configure thread parallelism if OMP_NUM_THREADS isn't set
#
# Helps to avoid CPU contention. The default of spawning a thread per
# core combined with multiprocessing for each GPU can have a negative
# impact on performance. The contention is amplified when running in a
# container where CPU limits can cause throttling.
翻译一下就是:
如果用户没有手动设置
OMP_NUM_THREADS,那就自动配置线程并行度。
这是为了避免 CPU 争抢。
默认情况下,一个进程可能会为每个 CPU core 都开线程;如果再叠加“每个 GPU 一个 worker 进程”,线程总数会爆炸,性能反而下降。
在容器里如果 CPU 还被限额,这个问题会更严重,因为会被 throttling(限速)。
3. 当前 Torch 默认线程数
num_threads = torch.get_num_threads()
意思是:
- 读取 PyTorch 当前准备使用的 CPU 线程数
比如可能是:
- 16
- 32
- 64
通常默认会比较大,接近机器 CPU 核数。
4. 读取容器 CPU 限额
cgroup_cpu_cores = get_cpu_cores_via_cgroups()
这一步就是前面那套逻辑:
- 如果跑在容器里,看看实际分到了多少 CPU
- 例如宿主机有 64 核,但容器只给了 8 核
5. 用 cgroup 限额修正线程数
if isinstance(cgroup_cpu_cores, int):
num_threads = max(1, min(num_threads, cgroup_cpu_cores))
else:
num_threads = max(1, num_threads)
意思是:
- 如果拿到了容器可用 CPU 数:
num_threads = min(torch默认线程数, 容器CPU核数)
- 至少保留 1 个线程
举例:
- torch 默认 32 线程
- 容器只给了 6 核
- 那最终先把
num_threads压到 6
6. 根据 world_size 计算每个 worker 应该分多少线程
default_omp_num_threads = max(
1, num_threads // max(1, parallel_config.world_size))
这一步很关键。
parallel_config.world_size 通常表示:
总共会有多少个并行 worker / rank / 进程参与执行
于是代码的想法是:
如果总共可用 CPU 线程数是
num_threads,而你要启动world_size个 worker,那每个 worker 平均分一点,不要每个人都把所有 CPU 吃满。
公式就是:
每个 worker 的 OMP_NUM_THREADS = num_threads // world_size
至少为 1。
例子
假设:
- 容器可用 CPU = 8 核
world_size = 4
那么:
default_omp_num_threads = 8 // 4 = 2
意思是:
- 一共 4 个 worker
- 每个 worker 最多用 2 个 OMP 线程
- 总体比较平衡
7. 只有在用户没手动设置 OMP_NUM_THREADS 时才自动干预
if (
"OMP_NUM_THREADS" not in os.environ
and (current_parallelism := torch.get_num_threads()) > default_omp_num_threads
):
条件有两个:
条件 1
用户没有手动设置:
OMP_NUM_THREADS
也就是说:
- 如果用户自己已经明确配置了线程数,代码就尊重用户,不乱改
条件 2
当前 PyTorch 线程数比建议值大
如果已经不大了,就没必要再调小。
8. 打印警告
logger.warning(
"Reducing Torch parallelism from %d threads to %d to avoid "
"unnecessary CPU contention. Set OMP_NUM_THREADS in the "
"external environment to tune this value as needed.",
current_parallelism,
default_omp_num_threads,
)
意思是:
正在把 Torch 的线程并行度从 X 降到 Y,避免没必要的 CPU 争抢。
如果你想自己调,建议在外部环境变量里设置OMP_NUM_THREADS。
9. 真正设置环境变量和 Torch 线程数
os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads)
torch.set_num_threads(default_omp_num_threads)
这一步做了两件事:
设置环境变量
OMP_NUM_THREADS=...
这个通常影响 OpenMP 线程池行为。
设置 PyTorch 当前线程数
torch.set_num_threads(...)
让当前进程里 PyTorch 的 CPU 并行线程数也同步生效。
三、为什么要这样做?
简单说,就是防止:
多进程 × 多线程 = 线程数爆炸
一个典型问题场景
假设:
- 机器有 32 个 CPU 核
- 你开 8 个 worker 进程(比如 8 张 GPU)
- 每个 worker 默认都开 32 个 CPU 线程
那总线程数就可能变成:
8 × 32 = 256
结果:
- 大量线程争抢 CPU
- 频繁上下文切换
- 实际吞吐下降
- 延迟变差
- 容器里更容易因为 CPU quota 被 throttle
也就是说:
线程不是越多越快,乱开线程常常更慢。
四、这段代码整体在做什么
可以总结成下面这个流程:
第一步:检查多进程启动方式
_maybe_force_spawn()
如果 CUDA/Ray 场景不安全,就强制 worker 用 spawn。
第二步:估算真实 CPU 资源
get_cpu_cores_via_cgroups()
看容器实际分到了多少 CPU,不要只看宿主机总核数。
第三步:按 worker 数量平分 CPU 线程
default_omp_num_threads = num_threads // world_size
避免每个 worker 都把所有 CPU 线程占满。
第四步:如果用户没配,就自动把线程数降下来
OMP_NUM_THREADS
torch.set_num_threads(...)
减少 CPU contention(争抢)。
五、一句话总结
这段代码是在:
vLLM 启动多进程 worker 前,根据 CUDA/Ray 环境决定是否强制用
spawn,再根据容器 CPU 配额和并行 worker 数量,自动把每个进程的 CPU 线程数降到合理范围,避免多进程下线程打架导致性能下降。
如果你愿意,我还能继续帮你讲这几个点里的任意一个:
OMP_NUM_THREADS到底控制什么torch.get_num_threads()和OMP_NUM_THREADS的关系- 为什么
world_size增大时每个 worker 的线程数要减少 - 在 Docker / K8s 里这段代码为什么特别重要
OpenMP 线程不是这段代码里直接 Thread(...) 启动的,而是:
当底层 C/C++/Fortran 代码进入一个 OpenMP 并行区域时,由 OpenMP 运行时自动创建/复用线程池。
也就是说,这段代码只是:
- 设置了环境变量
OMP_NUM_THREADS - 调用了
torch.set_num_threads(...)
它们是在告诉底层库“最多用多少线程”,但真正启动线程往往发生在后面执行某些 CPU 算子的时候。
1. 不是 Python 这里手动启动的
你看到的代码里:
os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads)
torch.set_num_threads(default_omp_num_threads)
这里只是在“配置”。
它没有做类似:
threading.Thread(...)
也没有做:
multiprocessing.Process(...)
所以 OpenMP 线程不是在这段 Python 代码里显式创建的。
2. 那到底是谁启动的?
通常是这些底层组件之一:
- PyTorch 的 C++ 后端
- MKL
- oneDNN
- OpenBLAS(虽然它不一定用 OpenMP)
- NumPy / SciPy 底层线性代数库
- 其他调用 OpenMP 的 C/C++ 扩展
比如当你执行:
torch.matmul(a, b)
或者某些 CPU 上的张量操作时,底层 C++ 实现可能会进入 OpenMP 并行区域,类似概念上:
#pragma omp parallel for
for (...) {
...
}
当执行到这里时,OpenMP runtime 才会决定:
- 要不要创建线程池
- 创建多少线程
- 是新建还是复用已有线程
3. 具体在 PyTorch 里是什么情况
PyTorch 有两类常见线程并行:
3.1 intra-op 并行
就是:
一个算子内部开多个线程并行
比如一个大矩阵乘法、卷积、张量遍历等。
这通常和:
torch.get_num_threads()
torch.set_num_threads()
关系很大。
3.2 inter-op 并行
就是:
多个独立算子之间并行调度
这个对应的是另外的接口,比如:
torch.set_num_interop_threads(...)
你这里的代码主要调的是intra-op线程数,也就是单个算子内部用多少 CPU 线程。
4. OMP_NUM_THREADS 在什么时候生效
OMP_NUM_THREADS 是 OpenMP 运行时常见的环境变量,表示:
OpenMP 并行区域默认最多使用多少线程
一般它在底层库初始化或第一次进入并行区域时起作用。
也就是说:
- 你先设置好
OMP_NUM_THREADS - 后面某个 PyTorch CPU 算子触发 OpenMP
- OpenMP runtime 根据这个值决定线程数
5. 一个形象理解
你可以把这段代码理解为:
- Python 这边只是给厨房下命令:
“最多安排 4 个厨师干活” - 真正什么时候把厨师叫出来,是后厨底层系统决定的
- 当订单来了(比如矩阵乘法、卷积),后厨才真的启用这些线程
所以:
线程是在底层算子执行时启动的,不是在这段配置代码里启动的。
6. 什么时候会“第一次看到”这些线程
通常是在你执行某些 CPU 密集型操作后,比如:
import torch
torch.set_num_threads(8)
x = torch.randn(5000, 5000)
y = torch.randn(5000, 5000)
z = x @ y
执行矩阵乘法时,底层线程可能才真正被创建。
如果你用 htop、top -H、ps -T 去看,往往这时候能看到进程里多出很多线程。
7. 在 vLLM 场景里为什么要提前设
因为 vLLM 会启动多个 worker 进程。
如果不提前限制,每个 worker 后续执行 CPU 相关操作时,底层 OpenMP 可能都想开很多线程。
比如:
- 4 个 worker
- 每个 worker 默认开 16 个 OpenMP 线程
那总共就可能有 64 个线程在抢 CPU。
所以这里先设置:
OMP_NUM_THREADS
torch.set_num_threads(...)
是为了让将来底层算子启动 OpenMP 线程时,不要开太多。
8. 更准确一点:OpenMP 线程不一定每次都“重新启动”
很多 OpenMP runtime 会维护线程池。
所以更准确地说:
- 第一次进入并行区域时,可能创建线程
- 之后再次执行并行区域时,可能复用已有线程
- 不一定每次计算都重新创建/销毁
因此你也可以理解为:
OMP_NUM_THREADS控制的是 OpenMP 线程池规模上限,而不是每次都现开多少个 Python 线程。
9. 一句话回答
OpenMP 线程在哪里启动的?
不是在这段 Python 代码里启动的,而是在 PyTorch / MKL / oneDNN 等底层 C/C++ 库执行 CPU 并行算子、进入 OpenMP 并行区域时,由 OpenMP runtime 自动创建或复用的。
如果你愿意,我还可以进一步讲:
torch.set_num_threads()和OMP_NUM_THREADS谁优先- PyTorch 到底是用 OpenMP、MKL 还是 oneDNN
- 怎么实际观察一个 Python 进程里 OpenMP 线程是不是被创建了
这段代码主要解决 vLLM 多进程(GPU worker)场景下 CPU 资源分配与线程竞争的问题。核心目标有两个:
- 强制使用
spawn多进程启动方式(通过调用前面定义的_maybe_force_spawn(),避免fork带来的 CUDA/XPU 状态问题)。 - 动态调整每个 worker 进程的 OpenMP 线程数(
OMP_NUM_THREADS),防止在容器或受限环境中因线程过多导致 CPU 过度竞争,影响推理性能。
下面拆解具体逻辑:
一、获取容器分配的 CPU 核心数(cgroups 限制)
get_cpu_cores_via_cgroups() -> int | None
- 兼容 cgroup v1:读取
/sys/fs/cgroup/cpu/cpu.cfs_period_us和cpu.cfs_quota_us,用ceil(quota / period)算出可用的 CPU 核心数。 - 兼容 cgroup v2:读取
/sys/fs/cgroup/cpu.max,解析格式如"200000 100000"或"max 100000"。若为"max"表示无限制,返回物理 CPU 总数;否则同样用ceil(quota / period)计算。
该函数返回容器允许使用的 CPU 核心数(若未限制则返回 None)。
二、多进程 worker 环境配置 set_multiprocessing_worker_envs
输入 parallel_config(包含 world_size,即并行的 GPU worker 数量)。流程如下:
-
调用
_maybe_force_spawn()
确保所有 worker 子进程都使用spawn方式启动(前面已解释过其作用)。 -
计算每个 worker 合适的 OpenMP 线程数
- 获取当前 PyTorch 默认线程数:
torch.get_num_threads()(通常等于物理 CPU 核心数)。 - 如果 cgroup 限制了 CPU 核心数
cgroup_cpu_cores,则取min(默认线程数, cgroup_cpu_cores)作为可用总线程数;否则用默认值。 - 将总线程数除以
world_size(并向上取整)得到 每个 worker 应使用的线程数:default_omp_num_threads = max(1, 总线程数 // world_size)
- 获取当前 PyTorch 默认线程数:
-
调整
OMP_NUM_THREADS
如果环境变量中没有设置OMP_NUM_THREADS,且当前 PyTorch 的线程数大于上面计算的合理值,则:- 记录警告日志,提示减少线程数以避免 CPU 争用。
- 设置
os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads)。 - 调用
torch.set_num_threads(default_omp_num_threads)立即生效。
为什么需要这些操作?
- 在 GPU 推理中,每个 worker 进程除了使用 GPU,也会在 CPU 上执行一些预处理/后处理或调度逻辑。如果每个 worker 都开启与 CPU 核心数相同的线程(例如 32 核开启 32 线程),而
world_size=8时,总线程数会暴涨到 256,造成严重的 CPU 上下文切换和争用,反而降低性能。 - 在容器环境中,用户可能只分配了 4 个 CPU 核心,但 PyTorch 默认仍会读取宿主机的 32 个核心,导致线程被频繁挂起或节流。通过读取 cgroups 可以真实反映可用资源,避免超配。
spawn强制使用:因为在 Ray actor 中或 CUDA 初始化后使用fork会导致子进程无法正确继承 GPU 上下文或死锁,必须强制spawn。
总结:这段代码是 vLLM 为多进程 GPU worker 做的资源感知与自适应调优,保证在各类环境(物理机、容器、Ray 集群)下都能避免 CPU 竞争,稳定高效运行。
openmp 线程在哪里启动的?

OpenMP 线程并不是由 Python 代码显式启动的,而是由底层的 OpenMP 运行时库(如 libgomp、libiomp5)在需要时自动创建和管理。下面是具体的启动时机和机制:
1. 线程创建时机:第一次进入并行区域
OpenMP 采用延迟创建策略。线程池通常不是在程序启动时创建,而是在首次执行并行构造(例如 #pragma omp parallel)时才真正启动。此后,线程池会一直存在,供后续并行区域复用。
在 PyTorch 中,常见的会触发 OpenMP 并行计算的算子包括:
- 矩阵乘法(
torch.mm,torch.bmm,torch.matmul) - 卷积(
torch.nn.functional.conv2d) - 逐元素运算(
torch.add等,当张量足够大时) - 归约操作(
torch.sum等)
示例:
当你第一次执行一个大的 a @ b 时,PyTorch 底层会调用 OpenMP 版本的矩阵乘法实现,此时 OpenMP 运行时检测到 OMP_NUM_THREADS 的值(或默认值),并创建相应数量的工作线程。
2. torch.set_num_threads 的作用
torch.set_num_threads(n) 会设置 PyTorch 内部线程池的大小。它主要做两件事:
- 设置 OpenMP 的
num_threads默认值(通过omp_set_num_threads)。 - 如果 PyTorch 编译时使用了 TBB 或 native 线程池,也会调整对应后端。
但注意:调用 set_num_threads 本身并不会立即启动任何线程。它只是修改了一个配置值,真正的线程仍然在第一次并行操作时才创建。
3. 环境变量 OMP_NUM_THREADS 的作用
OMP_NUM_THREADS 是一个标准环境变量,OpenMP 运行时会在程序启动时读取它作为默认线程数。如果代码中没有调用 omp_set_num_threads 覆盖,就使用这个值。
因此,在前面的代码片段中:
os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads)
torch.set_num_threads(default_omp_num_threads)
- 设置环境变量是为了子进程继承(因为
spawn方式启动的子进程会重新读取环境变量)。 - 同时调用
torch.set_num_threads让当前进程立即生效。
4. 实际线程创建位置(开发者视角)
如果你想在源码级别找到线程创建点,可以关注:
- OpenMP 库初始化:例如
libgomp中gomp_team_start函数会在第一次并行区域被调用时创建线程。 - PyTorch 的
at::init_num_threads:会调用at::set_num_threads,但依然不创建线程,只是设置值。
总结
- 线程启动时机:第一次执行需要 CPU 并行计算的算子(如大矩阵乘)时。
- 启动位置:在 OpenMP 运行时的
parallel区域入口,例如 PyTorch 调用的ATen/Parallel.h中的parallel_for。 - 控制方式:通过
OMP_NUM_THREADS环境变量或torch.set_num_threads指定线程数,但不能直接控制线程的启动时刻。
如果你的实际问题是“我想在代码中明确看到线程创建的过程”,那么几乎不可能在纯 Python 层捕捉到——它发生在编译好的二进制库内部,由 OpenMP 运行时透明管理。
CPU worker啥时候启动的,和GPU worker区别?两者会同时启动吗? openmp 线程这个线程会和vllm 中线程竞争用cPU吗
openmp 线程这个线程会和vllm 中线程竞争用cPU吗 vllm 中使用GPU worker时,还要用到openmp 嘛
更多推荐


所有评论(0)