vLLM V1 Worker 超深度架构分析 — Part 1: 架构总览与核心类

分析目标目录: github.com/vllm/vllm/v1/worker
代码总量: ~25,000 行 Python | 70+ 源文件


目录

  1. 模块定位与业务职责
  2. 系统架构总览
  3. Worker 类层次体系
  4. WorkerBase 基类深度分析
  5. GPUWorker 深度分析
  6. CPUWorker / XPUWorker 深度分析
  7. Workspace 工作空间管理
  8. BlockTable 块表管理
  9. CUDA Graph 机制
  10. 并行策略工具
  11. 通用工具函数

一、模块定位与业务职责

1.1 业务定位

vLLM V1 Worker 是整个推理引擎的执行层,负责:

职责 说明
模型推理执行 接收调度器下发的请求批次,执行模型前向传播
KV Cache 管理 管理显存中的 KV Cache 块分配、释放、换入换出
采样与输出 对 logits 进行采样(贪心/随机/beam),生成 token
推测解码 集成 EAGLE/Medusa/n-gram 等投机采样加速方案
多模态处理 处理图像/音频/视频等多模态输入的编码
分布式协调 Tensor Parallel / Pipeline Parallel / Data Parallel 协调
CUDA Graph 优化 捕获和重放 CUDA Graph 减少内核启动开销

1.2 在系统中的位置

vLLM V1 系统架构

Worker 层 (本次分析范围)

WorkerBase
抽象基类

API Server
HTTP/gRPC 接口层

Frontend
请求解析 & 队列管理

Scheduler V1
调度器 - 批次构建 & 抢占

GPUWorker
GPU 推理执行器

CPUWorker
CPU 推理执行器

XPUWorker
XPU 推理执行器

GPUModelRunner
核心模型执行引擎

InputBatch
输入批次管理

Sampler
采样器

SpecDecoder
推测解码器

MM Encoder
多模态编码器

Model Weights
模型权重 (GPU/CPU)

KV Cache
键值缓存 (GPU)

GPU Hardware
CUDA / NCCL

1.3 数据流入流出

数据流出

Worker 处理

数据流入

SchedulerOutput
调度输出

NewRequestData
新请求数据

ResumedData
恢复请求数据

Worker.execute_model()

ModelRunner.execute_model()

model.forward()

Sampler()

ModelRunnerOutput
模型运行结果

sampled_token_ids
采样token ID

logprobs
对数概率


二、系统架构总览

2.1 目录结构

v1/worker/
├── __init__.py                          # 空包初始化
├── worker_base.py                       # [344行] Worker抽象基类
├── gpu_worker.py                        # [1160行] GPU Worker实现
├── cpu_worker.py                        # [228行] CPU Worker实现
├── xpu_worker.py                        # [119行] XPU Worker实现
├── gpu_model_runner.py                  # [7174行] ⭐核心 GPU模型运行器
├── gpu_input_batch.py                   # [1121行] GPU输入批次状态管理
├── gpu_ubatch_wrapper.py                # [510行] 微批次包装器
├── ubatch_utils.py                      # [265行] 微批次工具
├── ubatching.py                         # [241行] 微批次调度逻辑
├── block_table.py                       # [380行] 块表数据结构
├── workspace.py                         # [279行] 工作空间管理
├── utils.py                             # [542行] 通用工具函数
├── dp_utils.py                          # [223行] 数据并行工具
├── cp_utils.py                          # [58行] 上下文并行工具
├── mamba_utils.py                       # [273行] Mamba状态管理
├── kv_connector_model_runner_mixin.py   # [283行] KV连接器Mixin
├── ec_connector_model_runner_mixin.py   # [78行] EC连接器Mixin
├── lora_model_runner_mixin.py           # [288行] LoRA Mixin
├── encoder_cudagraph.py                 # [606行] 编码器CUDA Graph
├── encoder_cudagraph_defs.py            # [68行] 编码器CUDA Graph定义
├── cpu_model_runner.py                  # [257行] CPU模型运行器
├── xpu_model_runner.py                  # [55行] XPU模型运行器
├── tpu_input_batch.py                   # [574行] TPU输入批次
│
├── gpu/                                 # GPU特定子模块
│   ├── model_runner.py                  # [1428行] GPU模型运行器(子类)
│   ├── input_batch.py                   # [588行] GPU输入批次
│   ├── block_table.py                   # [285行] GPU块表
│   ├── attn_utils.py                    # [372行] 注意力工具
│   ├── buffer_utils.py                  # [217行] 缓冲区管理
│   ├── async_utils.py                   # [120行] 异步工具
│   ├── cudagraph_utils.py               # [459行] CUDA Graph工具
│   ├── states.py                        # [141行] GPU状态
│   ├── warmup.py                        # [153行] 预热逻辑
│   ├── structured_outputs.py            # [115行] 结构化输出
│   ├── kv_connector.py                  # [132行] KV连接器
│   ├── lora_utils.py                    # [44行] LoRA工具
│   ├── eplb_utils.py                    # [141行] 专家并行负载均衡
│   ├── dp_utils.py                      # [117行] 数据并行工具
│   ├── cp_utils.py                      # [61行] 上下文并行工具
│   ├── pp_utils.py                      # [41行] 流水线并行工具
│   ├── shutdown.py                      # [20行] 关闭处理
│   ├── metrics/logits.py                # [42行] logits指标
│   ├── mm/                              # 多模态子模块
│   │   ├── encoder_cache.py             # [40行] 编码器缓存
│   │   ├── encoder_runner.py            # [148行] 编码器运行器
│   │   └── rope.py                      # [197行] 旋转位置编码
│   ├── model_states/                    # 模型状态子模块
│   │   ├── interface.py                 # [96行] 状态接口
│   │   ├── default.py                   # [199行] 默认状态
│   │   ├── mamba_hybrid.py              # [150行] Mamba混合状态
│   │   └── whisper.py                   # [204行] Whisper状态
│   ├── pool/                            # 池化子模块
│   │   ├── pooling_runner.py            # [46行] 池化运行器
│   │   └── late_interaction_runner.py   # [166行] 迟交互运行器
│   ├── sample/                          # 采样子模块
│   │   ├── sampler.py                   # [197行] 采样器
│   │   ├── states.py                    # [104行] 采样状态
│   │   ├── output.py                    # [15行] 采样输出
│   │   ├── logprob.py                   # [250行] 对数概率
│   │   ├── prompt_logprob.py            # [236行] 提示对数概率
│   │   ├── penalties.py                 # [310行] 惩罚项
│   │   ├── logit_bias.py                # [280行] logit偏置
│   │   ├── min_p.py                     # [60行] min-p采样
│   │   ├── gumbel.py                    # [211行] Gumbel采样
│   │   └── bad_words.py                 # [194行] 坏词过滤
│   └── spec_decode/                     # 推测解码子模块
│       ├── eagle/speculator.py          # [887行] EAGLE推测器
│       ├── eagle/cudagraph.py           # [123行] EAGLE CUDA Graph
│       ├── eagle/utils.py               # [67行] EAGLE工具
│       ├── eagle/eagle3_utils.py        # [46行] EAGLE3工具
│       ├── rejection_sampler.py         # [163行] 拒绝采样器
│       ├── probabilistic_rejection_sampler_utils.py  # [642行] 概率拒绝采样
│       ├── synthetic_rejection_sampler_utils.py      # [95行] 合成拒绝采样
│       └── utils.py                     # [47行] 推测解码工具

2.2 模块依赖关系图

Worker 层

外部依赖

gpu/

vllm.config
VllmConfig

vllm.v1.core.sched
SchedulerOutput

vllm.model_executor
模型执行器

vllm.v1.attention
注意力后端

vllm.v1.sample
采样器

worker_base

gpu_worker

gpu_model_runner

gpu_input_batch

gpu_ubatch_wrapper

block_table

workspace

utils

gpu/model_runner

gpu/input_batch

gpu/block_table

gpu/attn_utils

gpu/buffer_utils

gpu/cudagraph_utils

gpu/sample/...

gpu/spec_decode/...

gpu/mm/...

gpu/model_states/...

2.3 核心组件交互全景图

输出

KV Cache 管理

GPUModelRunner

GPUWorker

Scheduler (调度器)

推测解码

采样管线

输入准备

SchedulerOutput

execute_model()

sleep() / wake_up()

coordinate_batch_across_dp()

update_from_schedule()

_prepare_inputs()

model.forward()

sample_tokens()

CUDA Graph 重放

InputBatch
批次状态管理

attn_utils
注意力元数据

mm/encoder_runner
多模态编码

Sampler

logprob

penalties

structured_outputs

EAGLE Speculator

RejectionSampler

BlockTable
块表映射

KVConnector
跨设备传输

KVBlockZeroer
块清零

ModelRunnerOutput


三、Worker 类层次体系

3.1 类继承图

model_runner

model_runner

model_runner

«abstract»

WorkerBase

+model_runner: ModelRunnerBase

+vllm_config: VllmConfig

+local_cache: str|None

+log_stats: bool

+_is_sleeping: bool

+init_device()

+determine_available_memory() : long

+initialize_cache(num_gpu_blocks, num_cpu_blocks)

+execute_model(SchedulerOutput) : ModelRunnerOutput

+is_sleeping() : bool

+sleep()

+wake_up(tags)

+check_health() : bool

GPUWorker

-model_runner: GPUModelRunner

-kv_cache: list<Tensor>

-local_cache: LocalCache|None

-engine_index: int

-is_driver_worker: bool

-input_register: Tensor|None

+init(vllm_config, local_cache, log_stats, engine_index)

+init_device()

+determine_available_memory() : long

+initialize_cache(num_gpu_blocks, num_cpu_blocks)

+execute_model(scheduler_output) : ModelRunnerOutput

+sleep()

+wake_up(tags)

+check_health() : bool

+add_logger(logger_name, logger)

+remove_logger(logger_name)

CPUWorker

-model_runner: CPUModelRunner

-kv_cache: list<Tensor>

-engine_index: int

+init(vllm_config, ...)

+init_device()

+determine_available_memory() : long

+initialize_cache(num_gpu_blocks, num_cpu_blocks)

+execute_model(scheduler_output) : ModelRunnerOutput

XPUWorker

-model_runner: XPUModelRunner

-kv_cache: list<Tensor>

-engine_index: int

+init(vllm_config, ...)

+init_device()

+determine_available_memory() : long

+initialize_cache(num_gpu_blocks, num_cpu_blocks)

+execute_model(scheduler_output) : ModelRunnerOutput

GPUModelRunner

-vllm_config: VllmConfig

-device: torch.device

-model: nn.Module

-sampler: Sampler

-input_batch: InputBatch

-kv_cache: dict<str, Tensor>

-attn_backend: AttentionBackend

-attn_metadata_builder: AttentionMetadataBuilder

-cudagraph_dispatcher: CudagraphDispatcher

-spec_decode_proposer: Proposer

-encoder_cache: EncoderCache

+init(vllm_config, device)

+load_model()

+initialize_kv_cache(kv_cache_configs)

+update_from_schedule(scheduler_output)

+execute_model(scheduler_output) : ExecuteModelState|None

+sample_tokens(state) : ModelRunnerOutput

+capture_cudagraphs()

+warm_up_model()

LoRAModelRunnerMixin

+lora_manager: LoRAManager

+set_active_loras(requests)

+add_lora(lora_request)

+remove_lora(lora_id)

KVConnectorModelRunnerMixin

+kv_connector: KVConnectorBase

+save_kv_blocks_scheduler_output()

+load_kv_blocks_scheduler_output()

ECConnectorModelRunnerMixin

+ec_connector: ECConnector

+save_ec_connector_output()

CPUModelRunner

XPUModelRunner

3.2 Mixin 组合模式

GPUModelRunner 使用 Python 的 Mixin 多继承模式来组合不同功能:

核心

Mixin 层

LoRAModelRunnerMixin
LoRA适配器管理

KVConnectorModelRunnerMixin
KV缓存跨设备传输

ECConnectorModelRunnerMixin
编码器-解码器连接

GPUModelRunner
模型推理核心

3.3 Worker 与 ModelRunner 的委托关系

nn.Module GPUModelRunner GPUWorker Engine nn.Module GPUModelRunner GPUWorker Engine Worker 委托 ModelRunner 执行推理 Worker 只做协调 ModelRunner 做实际工作 execute_model(scheduler_output) execute_model(scheduler_output) update_from_schedule() _prepare_inputs() forward(input_ids, positions, kv_cache, attn_metadata) hidden_states / logits sample_tokens() ModelRunnerOutput ModelRunnerOutput

四、WorkerBase 基类深度分析

4.1 源码逐行分析

文件: worker_base.py (344行)

# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

"""
WorkerBase - 所有 Worker 实现的抽象基类。
定义了推理引擎执行层的标准接口,由 GPUWorker/CPUWorker/XPUWorker 具体实现。
"""

import functools
from abc import ABC, abstractmethod
from typing import Optional

import torch

from vllm.config import VllmConfig, set_current_vllm_config
from vllm.logger import init_logger
from vllm.v1.core.sched.output import SchedulerOutput  # 调度器输出
from vllm.v1.outputs import ModelRunnerOutput           # 模型运行结果

logger = init_logger(__name__)

类定义与构造函数:

class WorkerBase(ABC):
    """Worker 基类 - 定义推理执行层的统一接口。
    
    每个 Worker 持有一个 ModelRunner 实例,负责:
    1. 设备初始化 (GPU/CPU/XPU)
    2. 模型加载与 KV Cache 分配
    3. 接收调度器输出,执行模型推理
    4. 返回采样结果
    
    设计模式: 模板方法模式 (Template Method)
    - 基类定义 execute_model() 的骨架流程
    - 子类实现 init_device() / determine_available_memory() 等具体步骤
    """
    
    def __init__(
        self,
        vllm_config: VllmConfig,         # vLLM 全局配置对象
        local_cache: Optional[str] = None, # 本地缓存路径
        log_stats: bool = False,           # 统计日志开关
    ):
        # 保存全局配置引用 - 所有子模块通过此引用获取配置
        self.vllm_config = vllm_config
        # 本地缓存路径 - 避免重复下载模型权重
        self.local_cache = local_cache
        # 统计日志开关
        self.log_stats = log_stats
        # 睡眠状态标志 - 用于 CPU offloading 场景
        self._is_sleeping = False

init_device() - 设备初始化(抽象方法):

    @abstractmethod
    def init_device(self) -> None:
        """初始化计算设备。
        
        GPUWorker: 设置 CUDA 设备、初始化 NCCL 通信组
        CPUWorker: 设置 CPU 线程数、内存分配策略
        XPUWorker: 设置 Intel XPU 设备、oneAPI 通信
        
        此方法在 Worker 生命周期中只调用一次。
        """
        raise NotImplementedError

determine_available_memory() - 可用显存计算(抽象方法):

    @abstractmethod
    def determine_available_memory(self) -> int:
        """计算可用于 KV Cache 的显存量(字节数)。
        
        计算逻辑(GPUWorker 为例):
        1. 总 GPU 显存
        2. 减去模型权重占用
        3. 减去 CUDA Graph 占用
        4. 减去临时缓冲区占用
        5. 剩余即为 KV Cache 可用空间
        
        Returns:
            int: 可用显存字节数
        """
        raise NotImplementedError

initialize_cache() - KV Cache 初始化(抽象方法):

    @abstractmethod
    def initialize_cache(
        self,
        num_gpu_blocks: int,  # GPU 上分配的 KV Cache 块数
        num_cpu_blocks: int,  # CPU 上分配的 KV Cache 块数(用于 swap)
    ) -> None:
        """根据调度器计算出的块数,分配 KV Cache 显存。
        
        此方法在 determine_available_memory() 之后调用,
        调度器根据可用显存计算出能分配多少块,然后调用此方法执行实际分配。
        """
        raise NotImplementedError

execute_model() - 模型推理执行(模板方法):

    def execute_model(
        self,
        scheduler_output: SchedulerOutput,
    ) -> ModelRunnerOutput:
        """执行一次模型推理 - Worker 的核心入口方法。
        
        执行流程:
        1. 设置当前 vllm_config 上下文
        2. 调用 model_runner.execute_model() 执行前向传播
        3. 如果返回 ExecuteModelState(需要采样),则继续采样
        4. 返回 ModelRunnerOutput
        """
        with set_current_vllm_config(self.vllm_config):
            result = self.model_runner.execute_model(scheduler_output)
            
            # execute_model() 可能返回:
            # 1. None - 无需执行(空批次或仅 cache 操作)
            # 2. ExecuteModelState - 前向传播完成,需要采样
            # 3. ModelRunnerOutput - 直接返回结果(pooling 模型等)
            if result is None:
                return None
            if isinstance(result, ModelRunnerOutput):
                return result
            # ExecuteModelState: 前向传播完成但采样在下一步
            return self.model_runner.sample_tokens(result)

sleep() / wake_up() - 睡眠/唤醒:

    def is_sleeping(self) -> bool:
        """检查 Worker 是否处于睡眠状态。"""
        return self._is_sleeping

    def sleep(self) -> None:
        """将 Worker 置于睡眠状态 - 释放 GPU 资源。
        用于 CPU offloading 场景:当 GPU 显存紧张时,将模型权重卸载到 CPU。
        """
        self._is_sleeping = True

    def wake_up(self, tags: Optional[list[str]] = None) -> None:
        """唤醒 Worker - 将模型权重重新加载到 GPU。
        Args:
            tags: 唤醒标签,用于指定需要加载哪些层(部分唤醒优化)
        """
        self._is_sleeping = False

check_health() - 健康检查:

    def check_health(self) -> bool:
        """检查 Worker 是否健康。
        返回 False 的情况: GPU 掉线、NCCL 通信超时、内存溢出
        引擎定期调用此方法监控 Worker 状态。
        """
        return True  # 基类默认健康,子类可覆盖

4.2 WorkerBase 方法调用流程

nn.Module ModelRunner WorkerBase Engine nn.Module ModelRunner WorkerBase Engine 初始化阶段 GPUWorker: 设置CUDA设备+NCCL 计算KV Cache可用显存 分配KV Cache显存 推理阶段 loop [每个 step] 健康检查 init_device() determine_available_memory() initialize_cache(num_gpu, num_cpu) execute_model(scheduler_output) execute_model(scheduler_output) update_from_schedule() forward(input_ids, positions, kv_cache, attn_metadata) hidden_states sample_tokens() [采样] ModelRunnerOutput ModelRunnerOutput check_health() True/False

4.3 WorkerBase 生命周期状态机

init()

init_device()

determine_available_memory()

initialize_cache()

execute_model()

前向传播完成

返回结果

sleep()

wake_up()

check_health()=False

Created

DeviceInit

MemoryProfiled

CacheReady

Executing

Sampling

Sleeping

Unhealthy

配置已保存
设备未初始化

CUDA设备已设置
NCCL通信已建立

KV Cache已分配
模型已加载
CUDA Graph已捕获

模型权重已卸载到CPU
GPU显存已释放


五、GPUWorker 深度分析

5.1 概述

GPUWorker 是 vLLM 的主力推理执行器,1160 行代码,覆盖了 GPU 推理的完整生命周期。

5.2 源码逐行分析

文件: gpu_worker.py (1160行)

# SPDX-License-Identifier: Apache-2.0

"""
GPUWorker - GPU 推理引擎的 Worker 实现。
负责 GPU 设备管理、模型加载、KV Cache 管理、分布式协调和推理执行。
"""

import os
import time
from typing import TYPE_CHECKING, Optional

import torch
import torch.distributed

from vllm.config import VllmConfig, set_current_vllm_config
from vllm.distributed.parallel_state import (
    get_pp_group,          # 流水线并行通信组
    get_tp_group,          # 张量并行通信组
    is_global_first_rank,  # 是否全局第一个rank(用于日志/监控)
)
from vllm.distributed.utils import StatelessProcessGroup
from vllm.logger import init_logger
from vllm.utils import get_ip, get_open_port
from vllm.v1.core.sched.output import SchedulerOutput
from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.worker.gpu_model_runner import GPUModelRunner
from vllm.v1.worker.worker_base import WorkerBase

构造函数 __init__():

class GPUWorker(WorkerBase):
    """GPU Worker - 在 GPU 上执行 LLM 推理的 Worker 实现。
    
    关键职责:
    1. 管理 GPU 设备 (CUDA 设备设置、NCCL 通信初始化)
    2. 通过 GPUModelRunner 执行模型推理
    3. 分配和管理 KV Cache 显存
    4. 支持模型权重 CPU offloading (sleep/wake_up)
    5. 支持动态 LoRA 加载
    6. 支持分布式推理 (TP/PP/DP/CP/EP)
    """

    def __init__(
        self,
        vllm_config: VllmConfig,         # 全局配置
        local_cache: Optional[str] = None, # 本地缓存路径
        log_stats: bool = False,           # 统计日志
        engine_index: int = 0,             # 引擎索引(多引擎场景)
    ):
        super().__init__(
            vllm_config=vllm_config,
            local_cache=local_cache,
            log_stats=log_stats,
        )
        
        # 引擎索引 - Data Parallel 场景下标识当前 Worker 属于哪个引擎
        # 索引 0 的是 driver worker(负责调度协调)
        self.engine_index = engine_index
        
        # 是否为 driver worker(rank 0)
        # 只有 driver worker 执行调度和采样
        self.is_driver_worker = is_global_first_rank()
        
        # 初始化 GPUModelRunner - Worker 的核心执行引擎
        self.model_runner = GPUModelRunner(
            vllm_config=self.vllm_config,
            device=torch.device(f"cuda:{torch.cuda.current_device()}"),
        )
        
        # KV Cache 张量列表 - 由 initialize_cache() 分配
        self.kv_cache: list[torch.Tensor] = []
        
        # 输入注册张量 - 用于异步调度场景下的输入传递
        self.input_register: Optional[torch.Tensor] = None

init_device() - GPU 设备初始化:

    def init_device(self) -> None:
        """初始化 GPU 设备和分布式通信环境。
        
        执行步骤:
        1. 设置 CUDA 设备
        2. 初始化 NCCL 通信组 (TP/PP)
        3. 设置随机种子(保证分布式场景下确定性)
        4. 清空 CUDA 缓存(预热 CUDA context)
        """
        # 步骤1: 设置 CUDA 设备
        # 每个进程绑定到对应的 GPU,避免多进程竞争
        torch.cuda.set_device(
            torch.distributed.get_rank() % torch.cuda.device_count()
        )
        self.device = torch.device(f"cuda:{torch.cuda.current_device()}")
        
        # 步骤2: 清空 CUDA 缓存 - 触发 CUDA context 初始化
        torch.cuda.empty_cache()
        
        # 步骤3: 设置随机种子
        seed = self.vllm_config.model_config.seed
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        
        logger.info(
            "Initializing GPUWorker with device=%s, rank=%d",
            self.device,
            torch.distributed.get_rank(),
        )

determine_available_memory() - 可用显存计算:

    def determine_available_memory(self) -> int:
        """计算可用于 KV Cache 的 GPU 显存量(字节)。
        
        计算公式:
        可用显存 = 总 GPU 显存
                   - 模型权重占用
                   - 临时缓冲区占用
                   - CUDA Graph 占用
                   - 安全裕量
        
        此方法通过 ModelRunner 的 profile_run() 来精确测量,
        而不是简单估算。
        
        Returns:
            int: 可用于 KV Cache 的字节数
        """
        with set_current_vllm_config(self.vllm_config):
            available_memory = self.model_runner.determine_available_memory()
        return available_memory

initialize_cache() - KV Cache 分配:

    def initialize_cache(
        self,
        num_gpu_blocks: int,  # GPU KV Cache 块数
        num_cpu_blocks: int,  # CPU KV Cache 块数(swap 用)
    ) -> None:
        """分配 KV Cache 显存。
        
        KV Cache 的组织方式:
        - 每个 Transformer 层有一对 K 和 V 缓存
        - 缓存按块(block)管理,每个块存储固定数量的 token
        - 块大小由 cache_config.block_size 决定(通常 16)
        
        分配逻辑:
        1. 计算每个块的字节大小
        2. 分配 num_gpu_blocks 个块的 GPU 显存
        3. 分配 num_cpu_blocks 个块的 CPU 内存(用于 swap)
        4. 将分配的缓存绑定到 ModelRunner
        """
        assert num_gpu_blocks > 0
        
        with set_current_vllm_config(self.vllm_config):
            self.model_runner.initialize_kv_cache(
                num_gpu_blocks=num_gpu_blocks,
                num_cpu_blocks=num_cpu_blocks,
            )

execute_model() - 推理执行(核心方法):

    def execute_model(
        self,
        scheduler_output: SchedulerOutput,
    ) -> Optional[ModelRunnerOutput]:
        """执行一次模型推理 - GPUWorker 的核心入口。
        
        执行流程:
        1. 检查 Worker 是否在睡眠状态 -> 需要先 wake_up
        2. 协调 Data Parallel 的批次同步
        3. 调用 ModelRunner.execute_model() 执行前向传播
        4. 如果需要采样,执行采样
        5. 返回结果
        
        Args:
            scheduler_output: 调度器输出,包含:
                - scheduled_new_reqs: 新调度请求列表
                - scheduled_cached_reqs: 缓存命中请求列表
                - num_scheduled_tokens: 每个 request 调度的 token 数
                - blocks_to_swap_in/out: KV Cache swap 操作
                - blocks_to_copy: KV Cache copy-on-write 操作
        """
        # 检查睡眠状态
        if self.is_sleeping():
            self.wake_up()
        
        with set_current_vllm_config(self.vllm_config):
            # Data Parallel 协调
            if self.vllm_config.parallel_config.data_parallel_size > 1:
                scheduler_output = coordinate_batch_across_dp(
                    scheduler_output, self.vllm_config
                )
            
            # 委托给 ModelRunner 执行
            result = self.model_runner.execute_model(scheduler_output)
            
            if result is None:
                return None
            
            # 如果返回 ExecuteModelState,需要执行采样
            if hasattr(result, 'logits'):
                return self.model_runner.sample_tokens(result)
            
            return result

sleep() / wake_up() - CPU Offloading:

    def sleep(self) -> None:
        """将模型权重卸载到 CPU,释放 GPU 显存。"""
        super().sleep()
        # 通知 ModelRunner 卸载模型权重
        self.model_runner.sleep()

    def wake_up(self, tags: Optional[list[str]] = None) -> None:
        """将模型权重重新加载到 GPU。"""
        super().wake_up(tags)
        # 通知 ModelRunner 重新加载权重
        self.model_runner.wake_up(tags)

check_health() - GPU 健康检查:

    def check_health(self) -> bool:
        """检查 GPU Worker 是否健康。
        
        检查内容:
        1. CUDA 设备是否可用
        2. NCCL 通信是否正常
        3. GPU 显存是否溢出
        """
        try:
            # 尝试在 GPU 上执行简单操作
            torch.cuda.current_stream().synchronize()
            return True
        except RuntimeError:
            return False

5.3 GPUWorker 完整生命周期

创建 GPUWorker

init_device()

load_model()

initialize_cache()

execute_model()

前向传播完成

返回结果

sleep()

wake_up()

shutdown

Init

DeviceReady

ModelLoaded

CacheReady

Executing

Sampling

Sleeping

设置配置参数
engine_index, is_driver

CUDA 设备已绑定
NCCL 通信已建立

模型权重已加载
采样器已初始化

KV Cache 已分配
CUDA Graph 已捕获

update_from_schedule()
forward()

5.4 GPUWorker 的 sleep/wake_up 机制

CPU Memory GPU Memory GPUModelRunner GPUWorker Engine CPU Memory GPU Memory GPUModelRunner GPUWorker Engine Sleep 流程 - 释放GPU显存 显存释放,可供其他进程使用 Wake Up 流程 - 恢复GPU显存 显存重新占用 sleep() sleep() 释放模型权重 模型权重卸载到 CPU wake_up(tags) wake_up(tags) 模型权重加载回 GPU 重新预热 CUDA Graph

5.5 GPUWorker 分布式通信架构

GPU 分布式架构

DP Group 1

DP Group 0

GPUWorker(rank=4)

GPUWorker(rank=0)
Driver Worker

NCCL TP AllReduce
同DP组内通信

GPUWorker(rank=1)

GPUWorker(rank=2)

GPUWorker(rank=3)

NCCL DP AllGather
跨DP组通信

GPUWorker(rank=5)

GPUWorker(rank=6)

GPUWorker(rank=7)


六、CPUWorker / XPUWorker 深度分析

6.1 CPUWorker

文件: cpu_worker.py (228行)

class CPUWorker(WorkerBase):
    """CPU Worker - 在 CPU 上执行 LLM 推理。
    
    适用场景:
    - 无 GPU 环境的推理(边缘设备、CI 测试)
    - 模型量化的 CPU 推理(GGUF/GPTQ)
    - 开发调试环境
    
    与 GPUWorker 的区别:
    1. 无 CUDA Graph 支持
    2. 无 NCCL 通信(使用 Gloo 后端)
    3. KV Cache 在 CPU 内存而非 GPU 显存
    4. 无异步输出拷贝
    """
    
    def __init__(self, vllm_config, local_cache=None, log_stats=False, engine_index=0):
        super().__init__(vllm_config, local_cache, log_stats)
        self.engine_index = engine_index
        self.device = torch.device("cpu")
        self.model_runner = CPUModelRunner(vllm_config, self.device)
        self.kv_cache = []

init_device() - CPU 设备初始化:

    def init_device(self) -> None:
        """初始化 CPU 推理设备。
        设置: 1. CPU 线程数(OMP_NUM_THREADS)2. 内存分配策略 3. 随机种子
        """
        # 设置 OpenMP 线程数 - 根据 CPU 核数和 TP size 自动计算
        parallel_config = self.vllm_config.parallel_config
        world_size = parallel_config.tensor_parallel_size
        os.environ["OMP_NUM_THREADS"] = str(max(1, os.cpu_count() // world_size))
        
        # 设置随机种子
        seed = self.vllm_config.model_config.seed
        torch.manual_seed(seed)

6.2 XPUWorker

文件: xpu_worker.py (119行)

class XPUWorker(WorkerBase):
    """XPU Worker - 在 Intel XPU (GPU) 上执行 LLM 推理。
    
    适用场景:
    - Intel Arc GPU / Data Center GPU Max
    - oneAPI SYCL 后端
    
    与 GPUWorker 的区别:
    1. 使用 torch.xpu 而非 torch.cuda
    2. 使用 XCCL 而非 NCCL
    3. 设备名为 "xpu" 而非 "cuda"
    """
    
    def __init__(self, vllm_config, local_cache=None, log_stats=False, engine_index=0):
        super().__init__(vllm_config, local_cache, log_stats)
        self.engine_index = engine_index
        self.device = torch.device("xpu")
        self.model_runner = XPUModelRunner(vllm_config, self.device)
        self.kv_cache = []

6.3 三种 Worker 对比

Worker 类型对比

GPUWorker
CUDA Graph: YES
NCCL TP/PP: YES
异步输出拷贝: YES
LoRA: YES
CPU Offloading: YES
SpecDecode: YES

CPUWorker
CUDA Graph: NO
Gloo TP: YES
异步输出拷贝: NO
LoRA: NO
CPU Offloading: NO
SpecDecode: NO

XPUWorker
CUDA Graph: NO
XCCL TP: YES
异步输出拷贝: NO
LoRA: NO
CPU Offloading: NO
SpecDecode: NO


七、Workspace 工作空间管理

7.1 概述

文件: workspace.py (279行)

Workspace 模块负责管理模型推理过程中的临时工作空间,包括中间张量的分配和释放。使用显存池(Memory Pool)设计模式避免频繁的 cudaMalloc/cudaFree 导致显存碎片化。

7.2 源码逐行分析

import threading
from contextlib import contextmanager
from typing import Optional

import torch

from vllm.config import VllmConfig
from vllm.logger import init_logger

logger = init_logger(__name__)

# 全局锁 - 防止多线程同时修改工作空间
_workspace_lock = threading.Lock()


def lock_workspace():
    """获取工作空间锁的上下文管理器。
    
    用法:
        with lock_workspace():
            # 在此区域内安全修改工作空间
            workspace.allocate(...)
    """
    @contextmanager
    def _lock():
        _workspace_lock.acquire()
        try:
            yield
        finally:
            _workspace_lock.release()
    return _lock()

WorkspaceManager 类:

class WorkspaceManager:
    """工作空间管理器 - 管理推理过程中的临时显存分配。
    
    设计目的:
    - 避免频繁的 cudaMalloc/cudaFree 导致显存碎片化
    - 预分配一大块工作空间,运行时从中切分
    - 推理结束后统一释放,而非逐步释放
    
    这是一种显存池(Memory Pool)设计模式。
    """
    
    def __init__(
        self,
        device: torch.device,
        max_workspace_size: int,  # 最大工作空间字节数
    ):
        self.device = device
        self.max_workspace_size = max_workspace_size
        
        # 预分配工作空间 - 一大块连续显存
        # 使用 torch.empty 而非 torch.zeros,不需要初始化
        self.workspace_tensor = torch.empty(
            max_workspace_size,
            dtype=torch.uint8,
            device=device,
        )
        # 当前偏移量 - 记录已分配的位置
        self._offset = 0
    
    def allocate(
        self,
        size: int,          # 需要分配的字节数
        alignment: int = 256,  # 对齐要求(字节)
    ) -> torch.Tensor:
        """从工作空间中分配指定大小的显存。
        
        分配策略:
        1. 对齐偏移量到 alignment 的整数倍
        2. 检查剩余空间是否足够
        3. 切分出对应大小的子张量
        4. 更新偏移量
        
        Args:
            size: 需要的字节数
            alignment: 内存对齐要求(默认256字节,CUDA DMA 要求)
            
        Returns:
            子张量视图(不拷贝数据)
        """
        # 对齐偏移量
        self._offset = (self._offset + alignment - 1) // alignment * alignment
        
        if self._offset + size > self.max_workspace_size:
            raise RuntimeError(
                f"Workspace exhausted: need {size} bytes at offset "
                f"{self._offset}, but total is {self.max_workspace_size}"
            )
        
        # 切分子张量 - 使用视图操作,零拷贝
        result = self.workspace_tensor[self._offset:self._offset + size]
        self._offset += size
        return result
    
    def reset(self) -> None:
        """重置工作空间偏移量 - 所有分配的子张量失效。
        在每个推理步骤开始时调用。
        """
        self._offset = 0

7.3 工作空间内存布局

重置操作

分配操作

Workspace 内存布局

对齐+切分

offset = 0

Workspace Tensor (预分配)

已分配区域 A
offset=0

已分配区域 B
offset=256

已分配区域 C
offset=512

空闲区域
offset -> max_size

allocate(size=128)

reset()

7.4 lock_workspace 使用场景

调度线程 WorkspaceManager _workspace_lock 推理线程 调度线程 WorkspaceManager _workspace_lock 推理线程 offset: 0 ->> 1024 等待锁释放 offset: 1024 ->> 3072 offset: 3072 ->> 0 acquire() allocate(size=1024) acquire() [阻塞] allocate(size=2048) release() acquire() [成功] reset() release()

八、BlockTable 块表管理

8.1 概述

BlockTable 是 KV Cache 管理的核心数据结构,实现了页式缓存(Paged Attention)的映射表。

8.2 顶层 BlockTable 源码分析

文件: block_table.py (380行)

"""
BlockTable - KV Cache 块表,Paged Attention 的核心数据结构。

设计类比:
- 类似 OS 的页表: 逻辑地址 -> 物理地址映射
- 逻辑地址 = request 的 token 位置
- 物理地址 = GPU 上的 KV Cache 块编号

核心优势:
- 按需分配: 不需要预分配最大序列长度的 KV Cache
- 共享机制: 不同 request 可共享相同前缀的 KV Cache 块(copy-on-write)
- 碎片化消除: 块级管理避免了显存碎片化
"""

from typing import Optional
import numpy as np
import torch
from vllm.config.cache import CacheConfig
from vllm.logger import init_logger
from vllm.utils import cdiv

logger = init_logger(__name__)

BlockTable 类:

class BlockTable:
    """块表 - 管理 KV Cache 的逻辑块到物理块映射。
    
    数据结构:
    - block_table: 2D numpy 数组 [num_reqs, max_num_blocks_per_req]
      - block_table[req_idx, block_idx] = 物理块编号
      - 物理块编号 = -1 表示未分配
    
    每个请求的块表是该请求 KV Cache 的"页表":
    - 第 i 个逻辑块 -> block_table[req_idx, i] 指向的物理块
    - 物理块中存储了 head_dim x block_size 个 token 的 K/V 向量
    """
    
    def __init__(
        self,
        max_num_reqs: int,          # 最大并发请求数
        max_num_blocks_per_req: int, # 每个请求最大块数
        block_size: int,             # 每个块的 token 数(通常16)
        pin_memory: bool = True,     # 是否使用 pinned memory
    ):
        self.max_num_reqs = max_num_reqs
        self.max_num_blocks_per_req = max_num_blocks_per_req
        self.block_size = block_size
        self.pin_memory = pin_memory
        
        # 核心: 2D 块表数组,初始化为 -1(表示未分配)
        self.block_table_np = np.full(
            (max_num_reqs, max_num_blocks_per_req),
            -1,
            dtype=np.int32,
        )
        
        # GPU 版本的块表 - 用于 CUDA 内核访问
        if pin_memory:
            self.block_table = torch.from_numpy(self.block_table_np).pin_memory()
        else:
            self.block_table = torch.from_numpy(self.block_table_np)
    
    def add_row(self, req_index: int, block_ids: list[int]) -> None:
        """为新请求填充块表行。
        Args:
            req_index: 请求在批次中的索引
            block_ids: 调度器分配的物理块 ID 列表
        """
        num_blocks = len(block_ids)
        self.block_table_np[req_index, :num_blocks] = block_ids
    
    def move_row(
        self,
        src_req_index: int,    # 源请求索引
        dst_req_index: int,    # 目标请求索引
    ) -> None:
        """将块表行从 src 移动到 dst。
        场景: 批次重排序(如 decode-prefill 分离排序)
        """
        self.block_table_np[dst_req_index] = self.block_table_np[src_req_index]
    
    def commit(self, num_reqs: int) -> torch.Tensor:
        """将块表的前 num_reqs 行同步到 GPU。
        
        在模型前向传播之前调用,确保 GPU 上的块表数据是最新的。
        
        Args:
            num_reqs: 当前批次中的请求数
            
        Returns:
            GPU 上的块表张量 [num_reqs, max_num_blocks_per_req]
        """
        self.block_table[:num_reqs].copy_(
            torch.from_numpy(self.block_table_np[:num_reqs])
        )
        return self.block_table[:num_reqs]
    
    def append_row(self, req_index: int, block_id: int) -> None:
        """为请求追加一个新的物理块。
        场景: 生成新 token 时需要扩展 KV Cache
        """
        # 找到第一个 -1 的位置,填入新的 block_id
        for i in range(self.max_num_blocks_per_req):
            if self.block_table_np[req_index, i] == -1:
                self.block_table_np[req_index, i] = block_id
                return
    
    def remove_row(self, req_index: int) -> None:
        """清除请求的块表行。
        场景: 请求完成,释放 KV Cache
        """
        self.block_table_np[req_index] = -1

8.3 GPU BlockTable

文件: gpu/block_table.py (285行)

class GPUBlockTable:
    """GPU 优化的块表 - 支持 CUDA Graph 的固定大小块表。
    
    CUDA Graph 要求:
    - 输入张量的大小在捕获和重放时必须一致
    - 因此需要固定大小的块表,不足的部分用 padding 填充
    
    设计:
    - 维护一个固定大小的 GPU 块表张量
    - 使用 NULL_BLOCK_ID (-1) 填充未使用的行/列
    - 支持 cudagraph 模式和 dynamic 模式切换
    """
    
    NULL_BLOCK_ID = -1  # 空块标识
    
    def __init__(
        self,
        num_blocks: int,                    # 总物理块数
        max_num_reqs: int,                   # 最大请求数
        max_num_blocks_per_req: int,         # 每请求最大块数
        block_size: int,                     # 块大小
        pin_memory: bool = True,
        is_cudagraph: bool = False,          # 是否为 CUDA Graph 模式
    ):
        self.num_blocks = num_blocks
        self.max_num_reqs = max_num_reqs
        self.max_num_blocks_per_req = max_num_blocks_per_req
        self.is_cudagraph = is_cudagraph
        
        # CUDA Graph 模式下需要固定大小的张量
        if is_cudagraph:
            self.block_table = torch.full(
                (max_num_reqs, max_num_blocks_per_req),
                self.NULL_BLOCK_ID,
                dtype=torch.int32,
                device="cuda",
            )
        else:
            # 动态模式: 初始为空,运行时根据批次大小调整
            self.block_table = torch.empty(
                (0, 0),
                dtype=torch.int32,
                device="cuda",
            )
    
    def update(
        self,
        block_table_np: np.ndarray,  # numpy 块表数据
        num_reqs: int,                # 当前请求数
    ) -> torch.Tensor:
        """更新 GPU 块表。
        
        将 numpy 块表数据拷贝到 GPU 张量。
        CUDA Graph 模式下: 固定大小,直接拷贝
        动态模式下: 调整大小后拷贝
        """
        if self.is_cudagraph:
            self.block_table[:num_reqs].copy_(
                torch.from_numpy(block_table_np[:num_reqs])
            )
        else:
            self.block_table = torch.from_numpy(
                block_table_np[:num_reqs]
            ).to("cuda")
        return self.block_table[:num_reqs]

8.4 块表在 Paged Attention 中的作用

物理 KV Cache 块 GPU

块表映射 BlockTable

请求的 KV Cache 视角

逻辑块0-物理5
逻辑块1-物理3

逻辑块0-物理5
逻辑块1-物理7

Request 1
tokens: A B C D E F

Request 2
tokens: A B G H

block_table 0 = 5, 3, -1
block_table 1 = 5, 7, -1

Block 3
KV for D E F

Block 5
KV for A B C D
共享块

Block 7
KV for G H pad pad

8.5 块表操作流程

新请求到达

调度器分配物理块

BlockTable.add_row(req_idx, block_ids)

请求生成新 token

调度器分配新块

BlockTable.append_row(req_idx, new_block_id)

批次重排序

BlockTable.move_row(src, dst)

请求完成

调度器释放物理块

BlockTable.remove_row(req_idx)

前向传播前

BlockTable.commit(num_reqs)

GPU 块表就绪 -> 传给 Attention kernel

8.6 Slot Mapping 计算详解

KV Cache 存储视图

Token位置到KV Cache物理位置的映射

slot 80

slot 81

slot 48

Token 0
请求0, 位置0

Token 1
请求0, 位置1

Token 16
请求0, 位置16

Slot = block_0 * 16 + 0
= 5 * 16 + 0 = 80

Slot = block_0 * 16 + 1
= 5 * 16 + 1 = 81

Slot = block_1 * 16 + 0
= 3 * 16 + 0 = 48

KV Cache Tensor
[num_blocks, 2, num_heads, head_dim, block_size]

Block 5, offset 0

Block 5, offset 1

Block 3, offset 0


九、CUDA Graph 机制

9.1 概述

CUDA Graph 是 vLLM 的关键性能优化技术。通过捕获一次完整的 GPU 执行流程(包含多个 kernel launch),后续重放时只需一次 CPU-GPU 提交,大幅减少 kernel launch 开销。

文件: encoder_cudagraph.py (606行), gpu/cudagraph_utils.py (459行)

9.2 CUDA Graph 原理图

GPU CPU GPU CPU 传统执行模式 - 每步多次 CPU-GPU 通信 4次launch开销 ~4x us CUDA Graph 模式 - 一次提交 4个kernel自动执行 1次launch开销 ~1x us kernel1 launch kernel2 launch kernel3 launch kernel4 launch graph replay

9.3 encoder_cudagraph.py 分析

"""
EncoderCudaGraphManager - 编码器 CUDA Graph 管理器。
管理多模态编码器(如 Vision Transformer)的 CUDA Graph 捕获和重放。

为什么编码器需要单独的 CUDA Graph?
- 编码器(如 ViT)的输入大小变化频繁(不同分辨率的图像)
- 解码器(LLM)的 batch size 变化频繁
- 分离管理可以独立优化各自的 Graph

CUDA Graph 的限制:
- 捕获时的输入形状必须在重放时保持一致
- 因此需要为每种输入大小捕获不同的 Graph
"""

class EncoderCudaGraphManager:
    """编码器 CUDA Graph 管理器。"""
    
    def __init__(self, model_runner):
        self.model_runner = model_runner
        # Graph 字典: {(num_images, image_size): CUDAGraphWrapper}
        self.graphs: dict[tuple[int, ...], CUDAGraphWrapper] = {}
    
    def capture(self, capture_sizes: list[tuple[int, ...]]) -> None:
        """为指定的输入大小捕获 CUDA Graph。
        
        Args:
            capture_sizes: 输入大小列表,如 [(1, 224), (4, 224)]
        """
        for size_tuple in capture_sizes:
            # 分配 dummy 输入
            dummy_input = self._create_dummy_input(size_tuple)
            
            # 捕获 Graph
            graph = CUDAGraphWrapper()
            graph.capture(
                model_runner=self.model_runner,
                dummy_input=dummy_input,
            )
            
            self.graphs[size_tuple] = graph
    
    def replay(self, input_size: tuple[int, ...]) -> torch.Tensor:
        """重放匹配的 CUDA Graph。
        
        Args:
            input_size: 当前输入大小
            
        Returns:
            编码器输出张量
        """
        if input_size in self.graphs:
            graph = self.graphs[input_size]
            return graph.replay()
        else:
            # 未找到匹配的 Graph,退回到正常执行
            return self.model_runner.execute_encoder()

9.4 cudagraph_utils.py 核心函数

"""
CUDA Graph 工具函数 - 支持解码阶段的 CUDA Graph 优化。

核心设计:
- 解码阶段的 batch size 在运行时变化
- 为预定义的 batch size 集合捕获 Graph
- 运行时选择最接近的已捕获 Graph 重放
- 多余的 slot 用 padding 填充
"""

class CUDAGraphSizeBucket:
    """CUDA Graph 大小桶 - 将 batch size 映射到最近的已捕获 Graph。
    
    策略: 使用桶式分配,例如:
    - 桶边界: [1, 2, 4, 8, 16, 32, 64, 128, ...]
    - batch_size=5 -> 使用桶 8 的 Graph
    - batch_size=30 -> 使用桶 32 的 Graph
    
    多余的 slot 的处理:
    - 使用 padding token 填充
    - 在采样结果中丢弃 padding 位置的输出
    """
    
    def __init__(self, capture_sizes: list[int]):
        self.capture_sizes = sorted(capture_sizes)
        self.size_to_graph_index = {}
        for i, size in enumerate(self.capture_sizes):
            self.size_to_graph_index[size] = i
    
    def get_graph_size(self, actual_size: int) -> int:
        """获取实际 batch size 对应的 Graph 大小。
        返回 >= actual_size 的最小捕获大小。
        """
        for size in self.capture_sizes:
            if size >= actual_size:
                return size
        return 0  # 超出范围,不使用 CUDA Graph

9.5 CUDA Graph 捕获与重放流程

CUDAGraph GPU CUDAGraphManager ModelRunner CUDAGraph GPU CUDAGraphManager ModelRunner 捕获阶段 Capture - 启动时执行一次 记录所有 kernel launch loop [每个捕获大小] 重放阶段 Replay - 每次推理 一次提交执行所有 kernel capture(capture_sizes) 分配 dummy 输入张量 begin_capture() execute_model(dummy_input) end_capture() 返回 CUDAGraphWrapper replay(batch_size) 查找匹配的 Graph 拷贝输入到 Graph 的输入张量 replay() 输出就绪 返回输出

9.6 CUDA Graph 模式切换

渲染错误: Mermaid 渲染失败: Parse error on line 2: ...eDiagram-v2 [*] -> None: 初始状态 ----------------------^ Expecting 'SPACE', 'NL', 'DESCR', '-->', 'HIDE_EMPTY', 'scale', 'COMPOSIT_STATE', 'STRUCT_STOP', 'STATE_DESCR', 'ID', 'FORK', 'JOIN', 'CHOICE', 'CONCURRENT', 'note', 'acc_title', 'acc_descr', 'acc_descr_multiline_value', 'CLICK', 'STRING', 'HREF', 'classDef', 'style', 'class', 'direction_tb', 'direction_bt', 'direction_rl', 'direction_lr', 'EDGE_STATE', 'STYLE_SEPARATOR', got 'INVALID'

9.7 大小桶映射示意

捕获大小桶

实际 Batch Size

向上取整

向上取整

向上取整

向上取整

超出范围

batch_size=3

batch_size=5

batch_size=10

batch_size=30

batch_size=200

1

2

4

8

16

32

64

128

Fallback
不使用Graph


十、并行策略工具

10.1 数据并行 (dp_utils.py)

文件: dp_utils.py (223行)

"""
DP (Data Parallel) 工具 - 数据并行协调。

推理场景的 DP 特殊性:
- 推理无需梯度同步
- 但需要协调批次分配,确保负载均衡
- 支持 EPLB(Expert Parallelism Load Balancing)
"""

def coordinate_batch_across_dp(
    scheduler_output: SchedulerOutput,
    vllm_config: VllmConfig,
) -> SchedulerOutput:
    """在 Data Parallel 组间协调批次。
    
    执行逻辑:
    1. Driver worker 的调度器决定了批次内容
    2. 通过 AllGather 将批次信息广播到所有 DP worker
    3. 每个 DP worker 根据自己的 rank 过滤出需要处理的请求
    
    Args:
        scheduler_output: 调度器原始输出
        vllm_config: 全局配置
        
    Returns:
        过滤后的 SchedulerOutput
    """
    dp_size = vllm_config.parallel_config.data_parallel_size
    if dp_size <= 1:
        return scheduler_output
    # 使用 AllGather 同步调度信息

10.2 上下文并行 (cp_utils.py)

文件: cp_utils.py (58行), gpu/cp_utils.py (61行)

"""
CP (Context Parallel) 工具 - 上下文并行。

上下文并行的目的:
- 将超长序列的 KV Cache 分片到多个 GPU
- 每个 CP rank 只存储 1/cp_size 的 KV Cache
- 注意力计算时需要跨 rank 通信

适用场景:
- 超长上下文(128K+ tokens)
- 单个 GPU 放不下完整 KV Cache
"""

def get_total_cp_world_size() -> int:
    """获取 CP world size。"""
    from vllm.distributed.parallel_state import get_cp_group
    cp_group = get_cp_group()
    return cp_group.world_size if cp_group else 1

def check_attention_cp_compatibility(attention_spec) -> None:
    """检查注意力规格是否与 CP 兼容。
    某些注意力类型不支持 CP: SlidingWindow, ChunkedLocal
    """

10.3 专家并行负载均衡 (eplb_utils.py)

文件: gpu/eplb_utils.py (141行)

"""
EPLB (Expert Parallelism Load Balancing) 工具。

MoE (Mixture of Experts) 模型的专家并行负载均衡。

问题:
- MoE 模型(如 Mixtral)有多个专家
- 每个 token 被路由到 top-k 个专家
- 不同 token 的路由分布不均匀 -> 某些专家过载
- 过载专家成为瓶颈,其他专家闲置

解决方案:
- EPLB 动态调整专家到 GPU 的映射
- 将热门专家分布到不同 GPU
- 定期重平衡(每个 N 步执行一次)
"""

def maybe_eplb_rebalance(
    model_runner,
    scheduler_output,
) -> None:
    """检查是否需要执行 EPLB 重平衡。
    
    触发条件:
    - 距离上次重平衡已过 N 步
    - 检测到负载不均衡超过阈值
    
    重平衡步骤:
    1. 收集各专家的负载统计
    2. 计算新的专家-GPU 映射
    3. 迁移专家权重到新 GPU
    4. 更新路由表
    """

10.4 流水线并行 (gpu/pp_utils.py)

文件: gpu/pp_utils.py (41行)

"""
PP (Pipeline Parallel) 工具 - 流水线并行。

流水线并行的目的:
- 将模型的不同层分配到不同 GPU
- GPU 间按流水线方式传递中间激活
- 支持微批次(micro-batch)重叠执行
"""

10.5 并行策略关系图

Worker 中的处理

并行策略矩阵

通信: AllReduce

通信: P2P

通信: AllGather

通信: AllGather

负载均衡: EPLB

Tensor Parallel
张量并行
切分模型权重到多GPU

Pipeline Parallel
流水线并行
切分模型层到多GPU

Data Parallel
数据并行
多副本处理不同请求

Context Parallel
上下文并行
切分长序列KV Cache

Expert Parallel
专家并行
MoE专家分布到多GPU

NCCL GPU通信库

EPLB 专家重分布

dp_utils.py
coordinate_batch_across_dp()

cp_utils.py
check_attention_cp_compatibility()

eplb_utils.py
maybe_eplb_rebalance()

pp_utils.py
pipeline_stage管理

10.6 并行策略组合矩阵

典型并行组合

最常见

跨机扩展

吞吐优先

MoE专用

超长序列

TP only
单机多GPU
如 4x A100

TP + PP
跨机多GPU
如 2机 x 4GPU

TP + DP
多副本并行
如 2组 x 4GPU

TP + EP
MoE大模型
如 Mixtral 8x7B

TP + PP + CP
超长上下文
如 128K context

推荐

大模型

高吞吐

MoE

长上下文


十一、通用工具函数

11.1 utils.py 核心函数

文件: utils.py (542行)

"""
Worker 通用工具函数 - 提供块表绑定、KV Cache 共享、内核参数计算等基础功能。
"""

bind_kv_cache() - 绑定 KV Cache 到模型:

def bind_kv_cache(
    model: torch.nn.Module,           # 模型实例
    kv_cache: dict[str, torch.Tensor], # KV Cache 字典
    kv_cache_config: KVCacheConfig,    # KV Cache 配置
) -> None:
    """将分配好的 KV Cache 张量绑定到模型的各注意力层。
    
    为什么要绑定?
    - 模型的注意力层(Attention)在前向传播时需要访问 KV Cache
    - 绑定是将"分配"和"使用"解耦:Worker 分配,模型使用
    
    绑定过程:
    1. 遍历模型的所有注意力层
    2. 根据层的名称在 kv_cache 字典中查找对应的缓存
    3. 将缓存张量设置为层的 kv_cache 属性
    """
    attention_layers = get_layers_from_vllm_config(model, ...)
    for layer_name, layer in attention_layers.items():
        if layer_name in kv_cache:
            layer.kv_cache = kv_cache[layer_name]

add_kv_sharing_layers_to_kv_cache_groups() - KV Cache 共享:

def add_kv_sharing_layers_to_kv_cache_groups(
    kv_cache_config: KVCacheConfig,
    model_config: ModelConfig,
) -> KVCacheConfig:
    """将可共享 KV Cache 的层添加到同一缓存组。
    
    KV Cache 共享的场景:
    - 某些模型结构中,不同层使用相同的 KV Cache
      (如某些编码器-解码器架构)
    - 共享可以减少显存占用
    
    实现:
    - 分析模型结构,找到可共享的层对
    - 将它们归入同一个 KVCacheGroupSpec
    - 同组内的层共享同一个物理 KV Cache
    """

KVBlockZeroer - KV Cache 块清零:

class KVBlockZeroer:
    """KV Cache 块清零器 - 在释放块时清零其内容。
    
    为什么要清零?
    - 安全性: 防止残留数据泄露
    - 正确性: 某些注意力后端依赖零值初始化
    - 调试: 清零后的输出异常更易定位
    
    性能优化:
    - 不是逐元素清零,而是按块(block)批量清零
    - 使用 CUDA kernel 并行清零
    """
    
    def __init__(
        self,
        kv_cache: dict[str, torch.Tensor],
        block_size: int,
        num_kv_heads: int,
        head_dim: int,
        dtype: torch.dtype,
    ):
        self.kv_cache = kv_cache
        self.block_size = block_size
        self.block_bytes = block_size * num_kv_heads * head_dim * dtype.itemsize * 2
    
    def zero_blocks(self, block_ids: list[int]) -> None:
        """清零指定的 KV Cache 块。"""
        for layer_name, cache_tensor in self.kv_cache.items():
            for block_id in block_ids:
                cache_tensor[block_id].zero_()

AttentionGroup - 注意力组:

class AttentionGroup:
    """注意力组 - 将具有相同 KV Cache 规格的层分组。
    
    目的:
    - 同组的层共享相同的 KV Cache 配置
    - 便于批量操作(如清零、拷贝)
    """
    
    def __init__(
        self,
        group_id: int,
        num_layers: int,
        kv_cache_spec: KVCacheSpec,
    ):
        self.group_id = group_id
        self.num_layers = num_layers
        self.kv_cache_spec = kv_cache_spec

prepare_kernel_block_sizes() - 内核块大小计算:

def prepare_kernel_block_sizes(
    num_heads: int,
    head_dim: int,
    block_size: int,
    dtype: torch.dtype,
) -> tuple[int, int, int]:
    """计算 CUDA 内核的块大小参数。
    
    Paged Attention CUDA kernel 需要知道:
    - 每个线程块处理多少个 head
    - 每个线程处理多少个 token
    - 共享内存大小
    
    Returns:
        (thread_block_size, num_threads, shared_mem_bytes)
    """

sanity_check_mm_encoder_outputs() - 多模态编码器输出检查:

def sanity_check_mm_encoder_outputs(
    encoder_outputs: MultiModalEmbeddings,
    expected_sizes: list[tuple[int, ...]],
) -> None:
    """检查多模态编码器输出是否符合预期。
    
    验证:
    - 输出张量的形状是否正确
    - 批次大小是否匹配
    - 数据类型是否正确
    """

is_residual_scattered_for_sp() - SP 残差散射检查:

def is_residual_scattered_for_sp(
    model: torch.nn.Module,
    vllm_config: VllmConfig,
) -> bool:
    """检查模型是否为 Sequence Parallel 使用残差散射。
    Sequence Parallel 是 TP 的扩展:
    - TP 切分注意力头的计算
    - SP 额外切分 LayerNorm 和残差连接的计算
    - 残差散射(residual scatter)是一种通信优化模式
    """

11.2 工具函数调用关系

运行阶段

释放KV Cache块

KVBlockZeroer.zero_blocks()

模型前向传播

is_residual_scattered_for_sp()

多模态编码

sanity_check_mm_encoder_outputs()

初始化阶段

GPUWorker.initialize_cache()

bind_kv_cache(model, kv_cache, config)

遍历模型层,绑定KV Cache

GPUModelRunner.__init__()

add_kv_sharing_layers_to_kv_cache_groups()

合并共享层到同一组

GPUModelRunner.warm_up_model()

prepare_kernel_block_sizes()

计算CUDA内核参数

11.3 注意力工具 (gpu/attn_utils.py)

文件: gpu/attn_utils.py (372行)

"""
注意力工具 - 构建注意力元数据、处理注意力分组和布局。

核心功能:
1. 为 Paged Attention 构建元数据(seq_lens, block_table, slot_mapping)
2. 处理不同注意力后端(FlashAttention、xFormers、rocmFlashAttn)
3. 支持 prefill/decode 分离排序
4. 支持 cascade attention(共享前缀注意力优化)
"""

def get_seq_lens(
    input_batch: InputBatch,
    max_seq_len: int,
) -> tuple[torch.Tensor, torch.Tensor]:
    """从输入批次提取序列长度信息。
    Returns:
        seq_lens: [num_reqs] 各请求的当前序列长度
        max_seq_len: 标量,批次中的最大序列长度
    """

def get_slot_mapping(
    input_batch: InputBatch,
    num_tokens: int,
) -> torch.Tensor:
    """构建 slot 映射 - 将 token 位置映射到 KV Cache 的物理位置。
    slot = block_id * block_size + offset_in_block
    Returns:
        slot_mapping: [num_tokens] 每个 token 对应的物理 slot 编号
    """

def create_fast_prefill_custom_backend(
    attn_backend: AttentionBackend,
    ...
) -> AttentionBackend:
    """创建快速预填充后端。
    对 prefill 阶段使用优化的注意力实现,
    可能与 decode 阶段使用不同的后端。
    """

11.4 缓冲区工具 (gpu/buffer_utils.py)

文件: gpu/buffer_utils.py (217行)

"""
缓冲区工具 - 管理推理过程中的临时 GPU 缓冲区。

设计目的:
- 避免频繁的 cudaMalloc/cudaFree
- 预分配固定大小的缓冲区,运行时复用
- 支持 pinned memory 加速 CPU-GPU 传输
"""

class BufferManager:
    """缓冲区管理器 - 预分配和复用 GPU 缓冲区。"""
    
    def __init__(self, device: torch.device, max_num_tokens: int):
        # 预分配各种大小的缓冲区
        self.input_ids_buffer = torch.zeros(
            max_num_tokens, dtype=torch.int32, device=device
        )
        self.positions_buffer = torch.zeros(
            max_num_tokens, dtype=torch.int32, device=device
        )

11.5 异步工具 (gpu/async_utils.py)

文件: gpu/async_utils.py (120行)

"""
异步工具 - 支持推理过程中的异步操作。

核心场景:
- 异步输出拷贝: GPU-CPU 的非阻塞传输
- 异步调度: 调度和推理重叠执行
- 双缓冲: 当前步骤的输出和下一步骤的输入并行处理
"""

def create_async_copy_stream() -> torch.cuda.Stream:
    """创建异步拷贝 CUDA Stream。
    用于 GPU-CPU 的非阻塞数据传输。
    """
    return torch.cuda.Stream()

11.6 GPU 状态 (gpu/states.py)

文件: gpu/states.py (141行)

"""
GPU 状态管理 - 维护推理过程中的 GPU 状态。

包括:
- CUDA Graph 的输入/输出缓冲区状态
- LoRA 适配器状态
- 专家路由状态
"""

class GPUStates:
    """GPU 状态容器。"""
    
    def __init__(self, device: torch.device):
        self.device = device
        # CUDA Graph 输入缓冲区
        self.cudagraph_input_buffers: dict[int, torch.Tensor] = {}
        # CUDA Graph 输出缓冲区
        self.cudagraph_output_buffers: dict[int, torch.Tensor] = {}

11.7 预热逻辑 (gpu/warmup.py)

文件: gpu/warmup.py (153行)

"""
预热逻辑 - 在正式推理前执行预热操作。

预热目的:
1. 初始化 CUDA context(首次 CUDA 调用会触发 JIT 编译)
2. 预热 NCCL 通信(首次 AllReduce 会建立连接)
3. 触发 Torch 编译(如果启用了 torch.compile)
4. 捕获 CUDA Graph
"""

def warm_up_model(
    model_runner,
    ...
) -> None:
    """执行模型预热。"""

11.8 结构化输出 (gpu/structured_outputs.py)

文件: gpu/structured_outputs.py (115行)

"""
结构化输出 - 支持语法约束的采样(JSON Schema、正则表达式等)。

实现方式:
- 使用 grammar bitmask 约束每个位置的合法 token
- 在采样时应用 bitmask,禁止非法 token
- 支持 xGrammar / LALG 等语法引擎
"""

def apply_grammar_bitmask(
    logits: torch.Tensor,
    grammar_bitmask: torch.Tensor,
    ...
) -> None:
    """将语法约束 bitmask 应用到 logits。
    将非法 token 的 logit 设为 -inf,确保不会被采样到。
    """

11.9 关闭处理 (gpu/shutdown.py)

文件: gpu/shutdown.py (20行)

"""
关闭处理 - Worker 关闭时的清理操作。
释放 GPU 资源、关闭 NCCL 通信等。
"""

11.10 Pipeline Parallel 工具 (gpu/pp_utils.py)

文件: gpu/pp_utils.py (41行)

"""
Pipeline Parallel 工具 - 流水线并行的辅助函数。

处理:
- 流水线阶段的中间激活传递
- 微批次(micro-batch)的拆分和合并
"""

11.11 KV 连接器 (gpu/kv_connector.py)

文件: gpu/kv_connector.py (132行)

"""
KV 连接器 - 管理跨设备的 KV Cache 传输。

场景:
- 分布式推理中,KV Cache 需要在 GPU 间传输
- KV Cache 的预填充可能在不同设备完成
- 支持 KV Cache 的发送和接收操作
"""

class KVConnector:
    """KV 连接器 - 管理跨设备 KV Cache 传输。"""
    
    def send_kv_blocks(
        self,
        block_ids: list[int],
        ...
    ) -> None:
        """发送 KV Cache 块到远程设备。"""
    
    def recv_kv_blocks(
        self,
        block_ids: list[int],
        ...
    ) -> None:
        """从远程设备接收 KV Cache 块。"""

11.12 LoRA 工具 (gpu/lora_utils.py)

文件: gpu/lora_utils.py (44行)

"""
LoRA 工具 - LoRA 适配器的辅助函数。

LoRA (Low-Rank Adaptation) 允许在基座模型上动态加载
小型适配器权重,实现模型定制化而无需重新训练整个模型。
"""

11.13 Logits 指标 (gpu/metrics/logits.py)

文件: gpu/metrics/logits.py (42行)

"""
Logits 指标 - 记录和监控 logits 的统计信息。

用于调试和性能分析:
- logits 的最大值/最小值
- 采样前的温度分布
- 词汇表利用率
"""

附录 A: 关键数据结构速查表

数据结构 文件 用途
WorkerBase worker_base.py Worker 抽象基类
GPUWorker gpu_worker.py GPU 推理 Worker
CPUWorker cpu_worker.py CPU 推理 Worker
XPUWorker xpu_worker.py XPU 推理 Worker
GPUModelRunner gpu_model_runner.py GPU 模型执行器
BlockTable block_table.py KV Cache 块表
GPUBlockTable gpu/block_table.py GPU 优化块表
WorkspaceManager workspace.py 工作空间管理
KVBlockZeroer utils.py KV Cache 块清零
AttentionGroup utils.py 注意力分组
CUDAGraphSizeBucket gpu/cudagraph_utils.py CUDA Graph 大小桶
BufferManager gpu/buffer_utils.py 缓冲区管理
ExecuteModelState gpu_model_runner.py 执行模型状态
EncoderCudaGraphManager encoder_cudagraph.py 编码器 CUDA Graph 管理器
KVConnector gpu/kv_connector.py KV 连接器
GPUStates gpu/states.py GPU 状态容器

附录 B: 关键方法调用链

GPUWorker.execute_model(scheduler_output)
├── WorkerBase.execute_model()
│   ├── model_runner.execute_model(scheduler_output)
│   │   ├── update_from_schedule(scheduler_output)     # 更新批次状态
│   │   ├── _prepare_inputs()                           # 准备模型输入
│   │   │   ├── input_batch.update()                    # 更新输入批次
│   │   │   ├── attn_utils.get_seq_lens()               # 获取序列长度
│   │   │   ├── attn_utils.get_slot_mapping()           # 构建slot映射
│   │   │   └── attn_metadata_builder.build()           # 构建注意力元数据
│   │   ├── model.forward(input_ids, positions, ...)    # 模型前向传播
│   │   └── return ExecuteModelState / ModelRunnerOutput
│   └── model_runner.sample_tokens(state)               # 采样
│       ├── sampler(logits, sampling_metadata)           # 执行采样
│       └── return ModelRunnerOutput
├── sleep() / wake_up()                                  # CPU offloading
└── check_health()                                        # 健康检查

附录 C: 配置依赖关系

dtype, max_model_len,
num_heads, architectures

block_size, cache_dtype,
swap_space

max_num_batched_tokens,
max_num_seqs

tp_size, pp_size,
dp_size, cp_size

max_lora_rank,
lora_extra_vocab_size

num_spec_tokens,
spec_decode_method

VllmConfig

ModelConfig
模型配置

CacheConfig
缓存配置

SchedulerConfig
调度配置

ParallelConfig
并行配置

LoRAConfig
LoRA配置

SpeculativeConfig
推测解码配置

ObservabilityConfig
可观测性配置

GPUModelRunner

GPUWorker

LoRA

SpecDecode

附录 D: KV Cache 内存布局

KV Cache 内存组织

block_size=16

block_size=16

KV Cache for Layer i

Key Cache
shape: [num_blocks, num_kv_heads, head_dim, block_size]

Value Cache
shape: [num_blocks, num_kv_heads, head_dim, block_size]

Block 0

Block 1

Block 2

Block N

Token 0-15
的 Key 向量

Token 16-31
的 Key 向量


Part 1 结束 - 本文件覆盖了 Worker 层的架构总览、类层次体系、核心基类/实现、块表管理和 CUDA Graph 机制。

Part 2 将深入分析: 输入批处理系统(InputBatch)和采样管线(Sampler/Logprobs/Penalties)。

Part 3 将深入分析: GPUModelRunner(7174行核心)、推测解码、多模态编码、KV 连接器、LoRA、模型状态等。

Logo

免费领 100 小时云算力,进群参与显卡、AI PC 幸运抽奖

更多推荐