【vllm】(v1 Worker)vLLM V1 Worker— Part 1: 架构总览与核心类
vLLM V1 Worker 超深度架构分析 — Part 1: 架构总览与核心类
vLLM V1 Worker 超深度架构分析 — Part 1: 架构总览与核心类
分析目标目录:
github.com/vllm/vllm/v1/worker
代码总量: ~25,000 行 Python | 70+ 源文件
目录
- 模块定位与业务职责
- 系统架构总览
- Worker 类层次体系
- WorkerBase 基类深度分析
- GPUWorker 深度分析
- CPUWorker / XPUWorker 深度分析
- Workspace 工作空间管理
- BlockTable 块表管理
- CUDA Graph 机制
- 并行策略工具
- 通用工具函数
一、模块定位与业务职责
1.1 业务定位
vLLM V1 Worker 是整个推理引擎的执行层,负责:
| 职责 | 说明 |
|---|---|
| 模型推理执行 | 接收调度器下发的请求批次,执行模型前向传播 |
| KV Cache 管理 | 管理显存中的 KV Cache 块分配、释放、换入换出 |
| 采样与输出 | 对 logits 进行采样(贪心/随机/beam),生成 token |
| 推测解码 | 集成 EAGLE/Medusa/n-gram 等投机采样加速方案 |
| 多模态处理 | 处理图像/音频/视频等多模态输入的编码 |
| 分布式协调 | Tensor Parallel / Pipeline Parallel / Data Parallel 协调 |
| CUDA Graph 优化 | 捕获和重放 CUDA Graph 减少内核启动开销 |
1.2 在系统中的位置
1.3 数据流入流出
二、系统架构总览
2.1 目录结构
v1/worker/
├── __init__.py # 空包初始化
├── worker_base.py # [344行] Worker抽象基类
├── gpu_worker.py # [1160行] GPU Worker实现
├── cpu_worker.py # [228行] CPU Worker实现
├── xpu_worker.py # [119行] XPU Worker实现
├── gpu_model_runner.py # [7174行] ⭐核心 GPU模型运行器
├── gpu_input_batch.py # [1121行] GPU输入批次状态管理
├── gpu_ubatch_wrapper.py # [510行] 微批次包装器
├── ubatch_utils.py # [265行] 微批次工具
├── ubatching.py # [241行] 微批次调度逻辑
├── block_table.py # [380行] 块表数据结构
├── workspace.py # [279行] 工作空间管理
├── utils.py # [542行] 通用工具函数
├── dp_utils.py # [223行] 数据并行工具
├── cp_utils.py # [58行] 上下文并行工具
├── mamba_utils.py # [273行] Mamba状态管理
├── kv_connector_model_runner_mixin.py # [283行] KV连接器Mixin
├── ec_connector_model_runner_mixin.py # [78行] EC连接器Mixin
├── lora_model_runner_mixin.py # [288行] LoRA Mixin
├── encoder_cudagraph.py # [606行] 编码器CUDA Graph
├── encoder_cudagraph_defs.py # [68行] 编码器CUDA Graph定义
├── cpu_model_runner.py # [257行] CPU模型运行器
├── xpu_model_runner.py # [55行] XPU模型运行器
├── tpu_input_batch.py # [574行] TPU输入批次
│
├── gpu/ # GPU特定子模块
│ ├── model_runner.py # [1428行] GPU模型运行器(子类)
│ ├── input_batch.py # [588行] GPU输入批次
│ ├── block_table.py # [285行] GPU块表
│ ├── attn_utils.py # [372行] 注意力工具
│ ├── buffer_utils.py # [217行] 缓冲区管理
│ ├── async_utils.py # [120行] 异步工具
│ ├── cudagraph_utils.py # [459行] CUDA Graph工具
│ ├── states.py # [141行] GPU状态
│ ├── warmup.py # [153行] 预热逻辑
│ ├── structured_outputs.py # [115行] 结构化输出
│ ├── kv_connector.py # [132行] KV连接器
│ ├── lora_utils.py # [44行] LoRA工具
│ ├── eplb_utils.py # [141行] 专家并行负载均衡
│ ├── dp_utils.py # [117行] 数据并行工具
│ ├── cp_utils.py # [61行] 上下文并行工具
│ ├── pp_utils.py # [41行] 流水线并行工具
│ ├── shutdown.py # [20行] 关闭处理
│ ├── metrics/logits.py # [42行] logits指标
│ ├── mm/ # 多模态子模块
│ │ ├── encoder_cache.py # [40行] 编码器缓存
│ │ ├── encoder_runner.py # [148行] 编码器运行器
│ │ └── rope.py # [197行] 旋转位置编码
│ ├── model_states/ # 模型状态子模块
│ │ ├── interface.py # [96行] 状态接口
│ │ ├── default.py # [199行] 默认状态
│ │ ├── mamba_hybrid.py # [150行] Mamba混合状态
│ │ └── whisper.py # [204行] Whisper状态
│ ├── pool/ # 池化子模块
│ │ ├── pooling_runner.py # [46行] 池化运行器
│ │ └── late_interaction_runner.py # [166行] 迟交互运行器
│ ├── sample/ # 采样子模块
│ │ ├── sampler.py # [197行] 采样器
│ │ ├── states.py # [104行] 采样状态
│ │ ├── output.py # [15行] 采样输出
│ │ ├── logprob.py # [250行] 对数概率
│ │ ├── prompt_logprob.py # [236行] 提示对数概率
│ │ ├── penalties.py # [310行] 惩罚项
│ │ ├── logit_bias.py # [280行] logit偏置
│ │ ├── min_p.py # [60行] min-p采样
│ │ ├── gumbel.py # [211行] Gumbel采样
│ │ └── bad_words.py # [194行] 坏词过滤
│ └── spec_decode/ # 推测解码子模块
│ ├── eagle/speculator.py # [887行] EAGLE推测器
│ ├── eagle/cudagraph.py # [123行] EAGLE CUDA Graph
│ ├── eagle/utils.py # [67行] EAGLE工具
│ ├── eagle/eagle3_utils.py # [46行] EAGLE3工具
│ ├── rejection_sampler.py # [163行] 拒绝采样器
│ ├── probabilistic_rejection_sampler_utils.py # [642行] 概率拒绝采样
│ ├── synthetic_rejection_sampler_utils.py # [95行] 合成拒绝采样
│ └── utils.py # [47行] 推测解码工具
2.2 模块依赖关系图
2.3 核心组件交互全景图
三、Worker 类层次体系
3.1 类继承图
3.2 Mixin 组合模式
GPUModelRunner 使用 Python 的 Mixin 多继承模式来组合不同功能:
3.3 Worker 与 ModelRunner 的委托关系
四、WorkerBase 基类深度分析
4.1 源码逐行分析
文件: worker_base.py (344行)
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
WorkerBase - 所有 Worker 实现的抽象基类。
定义了推理引擎执行层的标准接口,由 GPUWorker/CPUWorker/XPUWorker 具体实现。
"""
import functools
from abc import ABC, abstractmethod
from typing import Optional
import torch
from vllm.config import VllmConfig, set_current_vllm_config
from vllm.logger import init_logger
from vllm.v1.core.sched.output import SchedulerOutput # 调度器输出
from vllm.v1.outputs import ModelRunnerOutput # 模型运行结果
logger = init_logger(__name__)
类定义与构造函数:
class WorkerBase(ABC):
"""Worker 基类 - 定义推理执行层的统一接口。
每个 Worker 持有一个 ModelRunner 实例,负责:
1. 设备初始化 (GPU/CPU/XPU)
2. 模型加载与 KV Cache 分配
3. 接收调度器输出,执行模型推理
4. 返回采样结果
设计模式: 模板方法模式 (Template Method)
- 基类定义 execute_model() 的骨架流程
- 子类实现 init_device() / determine_available_memory() 等具体步骤
"""
def __init__(
self,
vllm_config: VllmConfig, # vLLM 全局配置对象
local_cache: Optional[str] = None, # 本地缓存路径
log_stats: bool = False, # 统计日志开关
):
# 保存全局配置引用 - 所有子模块通过此引用获取配置
self.vllm_config = vllm_config
# 本地缓存路径 - 避免重复下载模型权重
self.local_cache = local_cache
# 统计日志开关
self.log_stats = log_stats
# 睡眠状态标志 - 用于 CPU offloading 场景
self._is_sleeping = False
init_device() - 设备初始化(抽象方法):
@abstractmethod
def init_device(self) -> None:
"""初始化计算设备。
GPUWorker: 设置 CUDA 设备、初始化 NCCL 通信组
CPUWorker: 设置 CPU 线程数、内存分配策略
XPUWorker: 设置 Intel XPU 设备、oneAPI 通信
此方法在 Worker 生命周期中只调用一次。
"""
raise NotImplementedError
determine_available_memory() - 可用显存计算(抽象方法):
@abstractmethod
def determine_available_memory(self) -> int:
"""计算可用于 KV Cache 的显存量(字节数)。
计算逻辑(GPUWorker 为例):
1. 总 GPU 显存
2. 减去模型权重占用
3. 减去 CUDA Graph 占用
4. 减去临时缓冲区占用
5. 剩余即为 KV Cache 可用空间
Returns:
int: 可用显存字节数
"""
raise NotImplementedError
initialize_cache() - KV Cache 初始化(抽象方法):
@abstractmethod
def initialize_cache(
self,
num_gpu_blocks: int, # GPU 上分配的 KV Cache 块数
num_cpu_blocks: int, # CPU 上分配的 KV Cache 块数(用于 swap)
) -> None:
"""根据调度器计算出的块数,分配 KV Cache 显存。
此方法在 determine_available_memory() 之后调用,
调度器根据可用显存计算出能分配多少块,然后调用此方法执行实际分配。
"""
raise NotImplementedError
execute_model() - 模型推理执行(模板方法):
def execute_model(
self,
scheduler_output: SchedulerOutput,
) -> ModelRunnerOutput:
"""执行一次模型推理 - Worker 的核心入口方法。
执行流程:
1. 设置当前 vllm_config 上下文
2. 调用 model_runner.execute_model() 执行前向传播
3. 如果返回 ExecuteModelState(需要采样),则继续采样
4. 返回 ModelRunnerOutput
"""
with set_current_vllm_config(self.vllm_config):
result = self.model_runner.execute_model(scheduler_output)
# execute_model() 可能返回:
# 1. None - 无需执行(空批次或仅 cache 操作)
# 2. ExecuteModelState - 前向传播完成,需要采样
# 3. ModelRunnerOutput - 直接返回结果(pooling 模型等)
if result is None:
return None
if isinstance(result, ModelRunnerOutput):
return result
# ExecuteModelState: 前向传播完成但采样在下一步
return self.model_runner.sample_tokens(result)
sleep() / wake_up() - 睡眠/唤醒:
def is_sleeping(self) -> bool:
"""检查 Worker 是否处于睡眠状态。"""
return self._is_sleeping
def sleep(self) -> None:
"""将 Worker 置于睡眠状态 - 释放 GPU 资源。
用于 CPU offloading 场景:当 GPU 显存紧张时,将模型权重卸载到 CPU。
"""
self._is_sleeping = True
def wake_up(self, tags: Optional[list[str]] = None) -> None:
"""唤醒 Worker - 将模型权重重新加载到 GPU。
Args:
tags: 唤醒标签,用于指定需要加载哪些层(部分唤醒优化)
"""
self._is_sleeping = False
check_health() - 健康检查:
def check_health(self) -> bool:
"""检查 Worker 是否健康。
返回 False 的情况: GPU 掉线、NCCL 通信超时、内存溢出
引擎定期调用此方法监控 Worker 状态。
"""
return True # 基类默认健康,子类可覆盖
4.2 WorkerBase 方法调用流程
4.3 WorkerBase 生命周期状态机
五、GPUWorker 深度分析
5.1 概述
GPUWorker 是 vLLM 的主力推理执行器,1160 行代码,覆盖了 GPU 推理的完整生命周期。
5.2 源码逐行分析
文件: gpu_worker.py (1160行)
# SPDX-License-Identifier: Apache-2.0
"""
GPUWorker - GPU 推理引擎的 Worker 实现。
负责 GPU 设备管理、模型加载、KV Cache 管理、分布式协调和推理执行。
"""
import os
import time
from typing import TYPE_CHECKING, Optional
import torch
import torch.distributed
from vllm.config import VllmConfig, set_current_vllm_config
from vllm.distributed.parallel_state import (
get_pp_group, # 流水线并行通信组
get_tp_group, # 张量并行通信组
is_global_first_rank, # 是否全局第一个rank(用于日志/监控)
)
from vllm.distributed.utils import StatelessProcessGroup
from vllm.logger import init_logger
from vllm.utils import get_ip, get_open_port
from vllm.v1.core.sched.output import SchedulerOutput
from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.worker.gpu_model_runner import GPUModelRunner
from vllm.v1.worker.worker_base import WorkerBase
构造函数 __init__():
class GPUWorker(WorkerBase):
"""GPU Worker - 在 GPU 上执行 LLM 推理的 Worker 实现。
关键职责:
1. 管理 GPU 设备 (CUDA 设备设置、NCCL 通信初始化)
2. 通过 GPUModelRunner 执行模型推理
3. 分配和管理 KV Cache 显存
4. 支持模型权重 CPU offloading (sleep/wake_up)
5. 支持动态 LoRA 加载
6. 支持分布式推理 (TP/PP/DP/CP/EP)
"""
def __init__(
self,
vllm_config: VllmConfig, # 全局配置
local_cache: Optional[str] = None, # 本地缓存路径
log_stats: bool = False, # 统计日志
engine_index: int = 0, # 引擎索引(多引擎场景)
):
super().__init__(
vllm_config=vllm_config,
local_cache=local_cache,
log_stats=log_stats,
)
# 引擎索引 - Data Parallel 场景下标识当前 Worker 属于哪个引擎
# 索引 0 的是 driver worker(负责调度协调)
self.engine_index = engine_index
# 是否为 driver worker(rank 0)
# 只有 driver worker 执行调度和采样
self.is_driver_worker = is_global_first_rank()
# 初始化 GPUModelRunner - Worker 的核心执行引擎
self.model_runner = GPUModelRunner(
vllm_config=self.vllm_config,
device=torch.device(f"cuda:{torch.cuda.current_device()}"),
)
# KV Cache 张量列表 - 由 initialize_cache() 分配
self.kv_cache: list[torch.Tensor] = []
# 输入注册张量 - 用于异步调度场景下的输入传递
self.input_register: Optional[torch.Tensor] = None
init_device() - GPU 设备初始化:
def init_device(self) -> None:
"""初始化 GPU 设备和分布式通信环境。
执行步骤:
1. 设置 CUDA 设备
2. 初始化 NCCL 通信组 (TP/PP)
3. 设置随机种子(保证分布式场景下确定性)
4. 清空 CUDA 缓存(预热 CUDA context)
"""
# 步骤1: 设置 CUDA 设备
# 每个进程绑定到对应的 GPU,避免多进程竞争
torch.cuda.set_device(
torch.distributed.get_rank() % torch.cuda.device_count()
)
self.device = torch.device(f"cuda:{torch.cuda.current_device()}")
# 步骤2: 清空 CUDA 缓存 - 触发 CUDA context 初始化
torch.cuda.empty_cache()
# 步骤3: 设置随机种子
seed = self.vllm_config.model_config.seed
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
logger.info(
"Initializing GPUWorker with device=%s, rank=%d",
self.device,
torch.distributed.get_rank(),
)
determine_available_memory() - 可用显存计算:
def determine_available_memory(self) -> int:
"""计算可用于 KV Cache 的 GPU 显存量(字节)。
计算公式:
可用显存 = 总 GPU 显存
- 模型权重占用
- 临时缓冲区占用
- CUDA Graph 占用
- 安全裕量
此方法通过 ModelRunner 的 profile_run() 来精确测量,
而不是简单估算。
Returns:
int: 可用于 KV Cache 的字节数
"""
with set_current_vllm_config(self.vllm_config):
available_memory = self.model_runner.determine_available_memory()
return available_memory
initialize_cache() - KV Cache 分配:
def initialize_cache(
self,
num_gpu_blocks: int, # GPU KV Cache 块数
num_cpu_blocks: int, # CPU KV Cache 块数(swap 用)
) -> None:
"""分配 KV Cache 显存。
KV Cache 的组织方式:
- 每个 Transformer 层有一对 K 和 V 缓存
- 缓存按块(block)管理,每个块存储固定数量的 token
- 块大小由 cache_config.block_size 决定(通常 16)
分配逻辑:
1. 计算每个块的字节大小
2. 分配 num_gpu_blocks 个块的 GPU 显存
3. 分配 num_cpu_blocks 个块的 CPU 内存(用于 swap)
4. 将分配的缓存绑定到 ModelRunner
"""
assert num_gpu_blocks > 0
with set_current_vllm_config(self.vllm_config):
self.model_runner.initialize_kv_cache(
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=num_cpu_blocks,
)
execute_model() - 推理执行(核心方法):
def execute_model(
self,
scheduler_output: SchedulerOutput,
) -> Optional[ModelRunnerOutput]:
"""执行一次模型推理 - GPUWorker 的核心入口。
执行流程:
1. 检查 Worker 是否在睡眠状态 -> 需要先 wake_up
2. 协调 Data Parallel 的批次同步
3. 调用 ModelRunner.execute_model() 执行前向传播
4. 如果需要采样,执行采样
5. 返回结果
Args:
scheduler_output: 调度器输出,包含:
- scheduled_new_reqs: 新调度请求列表
- scheduled_cached_reqs: 缓存命中请求列表
- num_scheduled_tokens: 每个 request 调度的 token 数
- blocks_to_swap_in/out: KV Cache swap 操作
- blocks_to_copy: KV Cache copy-on-write 操作
"""
# 检查睡眠状态
if self.is_sleeping():
self.wake_up()
with set_current_vllm_config(self.vllm_config):
# Data Parallel 协调
if self.vllm_config.parallel_config.data_parallel_size > 1:
scheduler_output = coordinate_batch_across_dp(
scheduler_output, self.vllm_config
)
# 委托给 ModelRunner 执行
result = self.model_runner.execute_model(scheduler_output)
if result is None:
return None
# 如果返回 ExecuteModelState,需要执行采样
if hasattr(result, 'logits'):
return self.model_runner.sample_tokens(result)
return result
sleep() / wake_up() - CPU Offloading:
def sleep(self) -> None:
"""将模型权重卸载到 CPU,释放 GPU 显存。"""
super().sleep()
# 通知 ModelRunner 卸载模型权重
self.model_runner.sleep()
def wake_up(self, tags: Optional[list[str]] = None) -> None:
"""将模型权重重新加载到 GPU。"""
super().wake_up(tags)
# 通知 ModelRunner 重新加载权重
self.model_runner.wake_up(tags)
check_health() - GPU 健康检查:
def check_health(self) -> bool:
"""检查 GPU Worker 是否健康。
检查内容:
1. CUDA 设备是否可用
2. NCCL 通信是否正常
3. GPU 显存是否溢出
"""
try:
# 尝试在 GPU 上执行简单操作
torch.cuda.current_stream().synchronize()
return True
except RuntimeError:
return False
5.3 GPUWorker 完整生命周期
5.4 GPUWorker 的 sleep/wake_up 机制
5.5 GPUWorker 分布式通信架构
六、CPUWorker / XPUWorker 深度分析
6.1 CPUWorker
文件: cpu_worker.py (228行)
class CPUWorker(WorkerBase):
"""CPU Worker - 在 CPU 上执行 LLM 推理。
适用场景:
- 无 GPU 环境的推理(边缘设备、CI 测试)
- 模型量化的 CPU 推理(GGUF/GPTQ)
- 开发调试环境
与 GPUWorker 的区别:
1. 无 CUDA Graph 支持
2. 无 NCCL 通信(使用 Gloo 后端)
3. KV Cache 在 CPU 内存而非 GPU 显存
4. 无异步输出拷贝
"""
def __init__(self, vllm_config, local_cache=None, log_stats=False, engine_index=0):
super().__init__(vllm_config, local_cache, log_stats)
self.engine_index = engine_index
self.device = torch.device("cpu")
self.model_runner = CPUModelRunner(vllm_config, self.device)
self.kv_cache = []
init_device() - CPU 设备初始化:
def init_device(self) -> None:
"""初始化 CPU 推理设备。
设置: 1. CPU 线程数(OMP_NUM_THREADS)2. 内存分配策略 3. 随机种子
"""
# 设置 OpenMP 线程数 - 根据 CPU 核数和 TP size 自动计算
parallel_config = self.vllm_config.parallel_config
world_size = parallel_config.tensor_parallel_size
os.environ["OMP_NUM_THREADS"] = str(max(1, os.cpu_count() // world_size))
# 设置随机种子
seed = self.vllm_config.model_config.seed
torch.manual_seed(seed)
6.2 XPUWorker
文件: xpu_worker.py (119行)
class XPUWorker(WorkerBase):
"""XPU Worker - 在 Intel XPU (GPU) 上执行 LLM 推理。
适用场景:
- Intel Arc GPU / Data Center GPU Max
- oneAPI SYCL 后端
与 GPUWorker 的区别:
1. 使用 torch.xpu 而非 torch.cuda
2. 使用 XCCL 而非 NCCL
3. 设备名为 "xpu" 而非 "cuda"
"""
def __init__(self, vllm_config, local_cache=None, log_stats=False, engine_index=0):
super().__init__(vllm_config, local_cache, log_stats)
self.engine_index = engine_index
self.device = torch.device("xpu")
self.model_runner = XPUModelRunner(vllm_config, self.device)
self.kv_cache = []
6.3 三种 Worker 对比
七、Workspace 工作空间管理
7.1 概述
文件: workspace.py (279行)
Workspace 模块负责管理模型推理过程中的临时工作空间,包括中间张量的分配和释放。使用显存池(Memory Pool)设计模式避免频繁的 cudaMalloc/cudaFree 导致显存碎片化。
7.2 源码逐行分析
import threading
from contextlib import contextmanager
from typing import Optional
import torch
from vllm.config import VllmConfig
from vllm.logger import init_logger
logger = init_logger(__name__)
# 全局锁 - 防止多线程同时修改工作空间
_workspace_lock = threading.Lock()
def lock_workspace():
"""获取工作空间锁的上下文管理器。
用法:
with lock_workspace():
# 在此区域内安全修改工作空间
workspace.allocate(...)
"""
@contextmanager
def _lock():
_workspace_lock.acquire()
try:
yield
finally:
_workspace_lock.release()
return _lock()
WorkspaceManager 类:
class WorkspaceManager:
"""工作空间管理器 - 管理推理过程中的临时显存分配。
设计目的:
- 避免频繁的 cudaMalloc/cudaFree 导致显存碎片化
- 预分配一大块工作空间,运行时从中切分
- 推理结束后统一释放,而非逐步释放
这是一种显存池(Memory Pool)设计模式。
"""
def __init__(
self,
device: torch.device,
max_workspace_size: int, # 最大工作空间字节数
):
self.device = device
self.max_workspace_size = max_workspace_size
# 预分配工作空间 - 一大块连续显存
# 使用 torch.empty 而非 torch.zeros,不需要初始化
self.workspace_tensor = torch.empty(
max_workspace_size,
dtype=torch.uint8,
device=device,
)
# 当前偏移量 - 记录已分配的位置
self._offset = 0
def allocate(
self,
size: int, # 需要分配的字节数
alignment: int = 256, # 对齐要求(字节)
) -> torch.Tensor:
"""从工作空间中分配指定大小的显存。
分配策略:
1. 对齐偏移量到 alignment 的整数倍
2. 检查剩余空间是否足够
3. 切分出对应大小的子张量
4. 更新偏移量
Args:
size: 需要的字节数
alignment: 内存对齐要求(默认256字节,CUDA DMA 要求)
Returns:
子张量视图(不拷贝数据)
"""
# 对齐偏移量
self._offset = (self._offset + alignment - 1) // alignment * alignment
if self._offset + size > self.max_workspace_size:
raise RuntimeError(
f"Workspace exhausted: need {size} bytes at offset "
f"{self._offset}, but total is {self.max_workspace_size}"
)
# 切分子张量 - 使用视图操作,零拷贝
result = self.workspace_tensor[self._offset:self._offset + size]
self._offset += size
return result
def reset(self) -> None:
"""重置工作空间偏移量 - 所有分配的子张量失效。
在每个推理步骤开始时调用。
"""
self._offset = 0
7.3 工作空间内存布局
7.4 lock_workspace 使用场景
八、BlockTable 块表管理
8.1 概述
BlockTable 是 KV Cache 管理的核心数据结构,实现了页式缓存(Paged Attention)的映射表。
8.2 顶层 BlockTable 源码分析
文件: block_table.py (380行)
"""
BlockTable - KV Cache 块表,Paged Attention 的核心数据结构。
设计类比:
- 类似 OS 的页表: 逻辑地址 -> 物理地址映射
- 逻辑地址 = request 的 token 位置
- 物理地址 = GPU 上的 KV Cache 块编号
核心优势:
- 按需分配: 不需要预分配最大序列长度的 KV Cache
- 共享机制: 不同 request 可共享相同前缀的 KV Cache 块(copy-on-write)
- 碎片化消除: 块级管理避免了显存碎片化
"""
from typing import Optional
import numpy as np
import torch
from vllm.config.cache import CacheConfig
from vllm.logger import init_logger
from vllm.utils import cdiv
logger = init_logger(__name__)
BlockTable 类:
class BlockTable:
"""块表 - 管理 KV Cache 的逻辑块到物理块映射。
数据结构:
- block_table: 2D numpy 数组 [num_reqs, max_num_blocks_per_req]
- block_table[req_idx, block_idx] = 物理块编号
- 物理块编号 = -1 表示未分配
每个请求的块表是该请求 KV Cache 的"页表":
- 第 i 个逻辑块 -> block_table[req_idx, i] 指向的物理块
- 物理块中存储了 head_dim x block_size 个 token 的 K/V 向量
"""
def __init__(
self,
max_num_reqs: int, # 最大并发请求数
max_num_blocks_per_req: int, # 每个请求最大块数
block_size: int, # 每个块的 token 数(通常16)
pin_memory: bool = True, # 是否使用 pinned memory
):
self.max_num_reqs = max_num_reqs
self.max_num_blocks_per_req = max_num_blocks_per_req
self.block_size = block_size
self.pin_memory = pin_memory
# 核心: 2D 块表数组,初始化为 -1(表示未分配)
self.block_table_np = np.full(
(max_num_reqs, max_num_blocks_per_req),
-1,
dtype=np.int32,
)
# GPU 版本的块表 - 用于 CUDA 内核访问
if pin_memory:
self.block_table = torch.from_numpy(self.block_table_np).pin_memory()
else:
self.block_table = torch.from_numpy(self.block_table_np)
def add_row(self, req_index: int, block_ids: list[int]) -> None:
"""为新请求填充块表行。
Args:
req_index: 请求在批次中的索引
block_ids: 调度器分配的物理块 ID 列表
"""
num_blocks = len(block_ids)
self.block_table_np[req_index, :num_blocks] = block_ids
def move_row(
self,
src_req_index: int, # 源请求索引
dst_req_index: int, # 目标请求索引
) -> None:
"""将块表行从 src 移动到 dst。
场景: 批次重排序(如 decode-prefill 分离排序)
"""
self.block_table_np[dst_req_index] = self.block_table_np[src_req_index]
def commit(self, num_reqs: int) -> torch.Tensor:
"""将块表的前 num_reqs 行同步到 GPU。
在模型前向传播之前调用,确保 GPU 上的块表数据是最新的。
Args:
num_reqs: 当前批次中的请求数
Returns:
GPU 上的块表张量 [num_reqs, max_num_blocks_per_req]
"""
self.block_table[:num_reqs].copy_(
torch.from_numpy(self.block_table_np[:num_reqs])
)
return self.block_table[:num_reqs]
def append_row(self, req_index: int, block_id: int) -> None:
"""为请求追加一个新的物理块。
场景: 生成新 token 时需要扩展 KV Cache
"""
# 找到第一个 -1 的位置,填入新的 block_id
for i in range(self.max_num_blocks_per_req):
if self.block_table_np[req_index, i] == -1:
self.block_table_np[req_index, i] = block_id
return
def remove_row(self, req_index: int) -> None:
"""清除请求的块表行。
场景: 请求完成,释放 KV Cache
"""
self.block_table_np[req_index] = -1
8.3 GPU BlockTable
文件: gpu/block_table.py (285行)
class GPUBlockTable:
"""GPU 优化的块表 - 支持 CUDA Graph 的固定大小块表。
CUDA Graph 要求:
- 输入张量的大小在捕获和重放时必须一致
- 因此需要固定大小的块表,不足的部分用 padding 填充
设计:
- 维护一个固定大小的 GPU 块表张量
- 使用 NULL_BLOCK_ID (-1) 填充未使用的行/列
- 支持 cudagraph 模式和 dynamic 模式切换
"""
NULL_BLOCK_ID = -1 # 空块标识
def __init__(
self,
num_blocks: int, # 总物理块数
max_num_reqs: int, # 最大请求数
max_num_blocks_per_req: int, # 每请求最大块数
block_size: int, # 块大小
pin_memory: bool = True,
is_cudagraph: bool = False, # 是否为 CUDA Graph 模式
):
self.num_blocks = num_blocks
self.max_num_reqs = max_num_reqs
self.max_num_blocks_per_req = max_num_blocks_per_req
self.is_cudagraph = is_cudagraph
# CUDA Graph 模式下需要固定大小的张量
if is_cudagraph:
self.block_table = torch.full(
(max_num_reqs, max_num_blocks_per_req),
self.NULL_BLOCK_ID,
dtype=torch.int32,
device="cuda",
)
else:
# 动态模式: 初始为空,运行时根据批次大小调整
self.block_table = torch.empty(
(0, 0),
dtype=torch.int32,
device="cuda",
)
def update(
self,
block_table_np: np.ndarray, # numpy 块表数据
num_reqs: int, # 当前请求数
) -> torch.Tensor:
"""更新 GPU 块表。
将 numpy 块表数据拷贝到 GPU 张量。
CUDA Graph 模式下: 固定大小,直接拷贝
动态模式下: 调整大小后拷贝
"""
if self.is_cudagraph:
self.block_table[:num_reqs].copy_(
torch.from_numpy(block_table_np[:num_reqs])
)
else:
self.block_table = torch.from_numpy(
block_table_np[:num_reqs]
).to("cuda")
return self.block_table[:num_reqs]
8.4 块表在 Paged Attention 中的作用
8.5 块表操作流程
8.6 Slot Mapping 计算详解
九、CUDA Graph 机制
9.1 概述
CUDA Graph 是 vLLM 的关键性能优化技术。通过捕获一次完整的 GPU 执行流程(包含多个 kernel launch),后续重放时只需一次 CPU-GPU 提交,大幅减少 kernel launch 开销。
文件: encoder_cudagraph.py (606行), gpu/cudagraph_utils.py (459行)
9.2 CUDA Graph 原理图
9.3 encoder_cudagraph.py 分析
"""
EncoderCudaGraphManager - 编码器 CUDA Graph 管理器。
管理多模态编码器(如 Vision Transformer)的 CUDA Graph 捕获和重放。
为什么编码器需要单独的 CUDA Graph?
- 编码器(如 ViT)的输入大小变化频繁(不同分辨率的图像)
- 解码器(LLM)的 batch size 变化频繁
- 分离管理可以独立优化各自的 Graph
CUDA Graph 的限制:
- 捕获时的输入形状必须在重放时保持一致
- 因此需要为每种输入大小捕获不同的 Graph
"""
class EncoderCudaGraphManager:
"""编码器 CUDA Graph 管理器。"""
def __init__(self, model_runner):
self.model_runner = model_runner
# Graph 字典: {(num_images, image_size): CUDAGraphWrapper}
self.graphs: dict[tuple[int, ...], CUDAGraphWrapper] = {}
def capture(self, capture_sizes: list[tuple[int, ...]]) -> None:
"""为指定的输入大小捕获 CUDA Graph。
Args:
capture_sizes: 输入大小列表,如 [(1, 224), (4, 224)]
"""
for size_tuple in capture_sizes:
# 分配 dummy 输入
dummy_input = self._create_dummy_input(size_tuple)
# 捕获 Graph
graph = CUDAGraphWrapper()
graph.capture(
model_runner=self.model_runner,
dummy_input=dummy_input,
)
self.graphs[size_tuple] = graph
def replay(self, input_size: tuple[int, ...]) -> torch.Tensor:
"""重放匹配的 CUDA Graph。
Args:
input_size: 当前输入大小
Returns:
编码器输出张量
"""
if input_size in self.graphs:
graph = self.graphs[input_size]
return graph.replay()
else:
# 未找到匹配的 Graph,退回到正常执行
return self.model_runner.execute_encoder()
9.4 cudagraph_utils.py 核心函数
"""
CUDA Graph 工具函数 - 支持解码阶段的 CUDA Graph 优化。
核心设计:
- 解码阶段的 batch size 在运行时变化
- 为预定义的 batch size 集合捕获 Graph
- 运行时选择最接近的已捕获 Graph 重放
- 多余的 slot 用 padding 填充
"""
class CUDAGraphSizeBucket:
"""CUDA Graph 大小桶 - 将 batch size 映射到最近的已捕获 Graph。
策略: 使用桶式分配,例如:
- 桶边界: [1, 2, 4, 8, 16, 32, 64, 128, ...]
- batch_size=5 -> 使用桶 8 的 Graph
- batch_size=30 -> 使用桶 32 的 Graph
多余的 slot 的处理:
- 使用 padding token 填充
- 在采样结果中丢弃 padding 位置的输出
"""
def __init__(self, capture_sizes: list[int]):
self.capture_sizes = sorted(capture_sizes)
self.size_to_graph_index = {}
for i, size in enumerate(self.capture_sizes):
self.size_to_graph_index[size] = i
def get_graph_size(self, actual_size: int) -> int:
"""获取实际 batch size 对应的 Graph 大小。
返回 >= actual_size 的最小捕获大小。
"""
for size in self.capture_sizes:
if size >= actual_size:
return size
return 0 # 超出范围,不使用 CUDA Graph
9.5 CUDA Graph 捕获与重放流程
9.6 CUDA Graph 模式切换
9.7 大小桶映射示意
十、并行策略工具
10.1 数据并行 (dp_utils.py)
文件: dp_utils.py (223行)
"""
DP (Data Parallel) 工具 - 数据并行协调。
推理场景的 DP 特殊性:
- 推理无需梯度同步
- 但需要协调批次分配,确保负载均衡
- 支持 EPLB(Expert Parallelism Load Balancing)
"""
def coordinate_batch_across_dp(
scheduler_output: SchedulerOutput,
vllm_config: VllmConfig,
) -> SchedulerOutput:
"""在 Data Parallel 组间协调批次。
执行逻辑:
1. Driver worker 的调度器决定了批次内容
2. 通过 AllGather 将批次信息广播到所有 DP worker
3. 每个 DP worker 根据自己的 rank 过滤出需要处理的请求
Args:
scheduler_output: 调度器原始输出
vllm_config: 全局配置
Returns:
过滤后的 SchedulerOutput
"""
dp_size = vllm_config.parallel_config.data_parallel_size
if dp_size <= 1:
return scheduler_output
# 使用 AllGather 同步调度信息
10.2 上下文并行 (cp_utils.py)
文件: cp_utils.py (58行), gpu/cp_utils.py (61行)
"""
CP (Context Parallel) 工具 - 上下文并行。
上下文并行的目的:
- 将超长序列的 KV Cache 分片到多个 GPU
- 每个 CP rank 只存储 1/cp_size 的 KV Cache
- 注意力计算时需要跨 rank 通信
适用场景:
- 超长上下文(128K+ tokens)
- 单个 GPU 放不下完整 KV Cache
"""
def get_total_cp_world_size() -> int:
"""获取 CP world size。"""
from vllm.distributed.parallel_state import get_cp_group
cp_group = get_cp_group()
return cp_group.world_size if cp_group else 1
def check_attention_cp_compatibility(attention_spec) -> None:
"""检查注意力规格是否与 CP 兼容。
某些注意力类型不支持 CP: SlidingWindow, ChunkedLocal
"""
10.3 专家并行负载均衡 (eplb_utils.py)
文件: gpu/eplb_utils.py (141行)
"""
EPLB (Expert Parallelism Load Balancing) 工具。
MoE (Mixture of Experts) 模型的专家并行负载均衡。
问题:
- MoE 模型(如 Mixtral)有多个专家
- 每个 token 被路由到 top-k 个专家
- 不同 token 的路由分布不均匀 -> 某些专家过载
- 过载专家成为瓶颈,其他专家闲置
解决方案:
- EPLB 动态调整专家到 GPU 的映射
- 将热门专家分布到不同 GPU
- 定期重平衡(每个 N 步执行一次)
"""
def maybe_eplb_rebalance(
model_runner,
scheduler_output,
) -> None:
"""检查是否需要执行 EPLB 重平衡。
触发条件:
- 距离上次重平衡已过 N 步
- 检测到负载不均衡超过阈值
重平衡步骤:
1. 收集各专家的负载统计
2. 计算新的专家-GPU 映射
3. 迁移专家权重到新 GPU
4. 更新路由表
"""
10.4 流水线并行 (gpu/pp_utils.py)
文件: gpu/pp_utils.py (41行)
"""
PP (Pipeline Parallel) 工具 - 流水线并行。
流水线并行的目的:
- 将模型的不同层分配到不同 GPU
- GPU 间按流水线方式传递中间激活
- 支持微批次(micro-batch)重叠执行
"""
10.5 并行策略关系图
10.6 并行策略组合矩阵
十一、通用工具函数
11.1 utils.py 核心函数
文件: utils.py (542行)
"""
Worker 通用工具函数 - 提供块表绑定、KV Cache 共享、内核参数计算等基础功能。
"""
bind_kv_cache() - 绑定 KV Cache 到模型:
def bind_kv_cache(
model: torch.nn.Module, # 模型实例
kv_cache: dict[str, torch.Tensor], # KV Cache 字典
kv_cache_config: KVCacheConfig, # KV Cache 配置
) -> None:
"""将分配好的 KV Cache 张量绑定到模型的各注意力层。
为什么要绑定?
- 模型的注意力层(Attention)在前向传播时需要访问 KV Cache
- 绑定是将"分配"和"使用"解耦:Worker 分配,模型使用
绑定过程:
1. 遍历模型的所有注意力层
2. 根据层的名称在 kv_cache 字典中查找对应的缓存
3. 将缓存张量设置为层的 kv_cache 属性
"""
attention_layers = get_layers_from_vllm_config(model, ...)
for layer_name, layer in attention_layers.items():
if layer_name in kv_cache:
layer.kv_cache = kv_cache[layer_name]
add_kv_sharing_layers_to_kv_cache_groups() - KV Cache 共享:
def add_kv_sharing_layers_to_kv_cache_groups(
kv_cache_config: KVCacheConfig,
model_config: ModelConfig,
) -> KVCacheConfig:
"""将可共享 KV Cache 的层添加到同一缓存组。
KV Cache 共享的场景:
- 某些模型结构中,不同层使用相同的 KV Cache
(如某些编码器-解码器架构)
- 共享可以减少显存占用
实现:
- 分析模型结构,找到可共享的层对
- 将它们归入同一个 KVCacheGroupSpec
- 同组内的层共享同一个物理 KV Cache
"""
KVBlockZeroer - KV Cache 块清零:
class KVBlockZeroer:
"""KV Cache 块清零器 - 在释放块时清零其内容。
为什么要清零?
- 安全性: 防止残留数据泄露
- 正确性: 某些注意力后端依赖零值初始化
- 调试: 清零后的输出异常更易定位
性能优化:
- 不是逐元素清零,而是按块(block)批量清零
- 使用 CUDA kernel 并行清零
"""
def __init__(
self,
kv_cache: dict[str, torch.Tensor],
block_size: int,
num_kv_heads: int,
head_dim: int,
dtype: torch.dtype,
):
self.kv_cache = kv_cache
self.block_size = block_size
self.block_bytes = block_size * num_kv_heads * head_dim * dtype.itemsize * 2
def zero_blocks(self, block_ids: list[int]) -> None:
"""清零指定的 KV Cache 块。"""
for layer_name, cache_tensor in self.kv_cache.items():
for block_id in block_ids:
cache_tensor[block_id].zero_()
AttentionGroup - 注意力组:
class AttentionGroup:
"""注意力组 - 将具有相同 KV Cache 规格的层分组。
目的:
- 同组的层共享相同的 KV Cache 配置
- 便于批量操作(如清零、拷贝)
"""
def __init__(
self,
group_id: int,
num_layers: int,
kv_cache_spec: KVCacheSpec,
):
self.group_id = group_id
self.num_layers = num_layers
self.kv_cache_spec = kv_cache_spec
prepare_kernel_block_sizes() - 内核块大小计算:
def prepare_kernel_block_sizes(
num_heads: int,
head_dim: int,
block_size: int,
dtype: torch.dtype,
) -> tuple[int, int, int]:
"""计算 CUDA 内核的块大小参数。
Paged Attention CUDA kernel 需要知道:
- 每个线程块处理多少个 head
- 每个线程处理多少个 token
- 共享内存大小
Returns:
(thread_block_size, num_threads, shared_mem_bytes)
"""
sanity_check_mm_encoder_outputs() - 多模态编码器输出检查:
def sanity_check_mm_encoder_outputs(
encoder_outputs: MultiModalEmbeddings,
expected_sizes: list[tuple[int, ...]],
) -> None:
"""检查多模态编码器输出是否符合预期。
验证:
- 输出张量的形状是否正确
- 批次大小是否匹配
- 数据类型是否正确
"""
is_residual_scattered_for_sp() - SP 残差散射检查:
def is_residual_scattered_for_sp(
model: torch.nn.Module,
vllm_config: VllmConfig,
) -> bool:
"""检查模型是否为 Sequence Parallel 使用残差散射。
Sequence Parallel 是 TP 的扩展:
- TP 切分注意力头的计算
- SP 额外切分 LayerNorm 和残差连接的计算
- 残差散射(residual scatter)是一种通信优化模式
"""
11.2 工具函数调用关系
11.3 注意力工具 (gpu/attn_utils.py)
文件: gpu/attn_utils.py (372行)
"""
注意力工具 - 构建注意力元数据、处理注意力分组和布局。
核心功能:
1. 为 Paged Attention 构建元数据(seq_lens, block_table, slot_mapping)
2. 处理不同注意力后端(FlashAttention、xFormers、rocmFlashAttn)
3. 支持 prefill/decode 分离排序
4. 支持 cascade attention(共享前缀注意力优化)
"""
def get_seq_lens(
input_batch: InputBatch,
max_seq_len: int,
) -> tuple[torch.Tensor, torch.Tensor]:
"""从输入批次提取序列长度信息。
Returns:
seq_lens: [num_reqs] 各请求的当前序列长度
max_seq_len: 标量,批次中的最大序列长度
"""
def get_slot_mapping(
input_batch: InputBatch,
num_tokens: int,
) -> torch.Tensor:
"""构建 slot 映射 - 将 token 位置映射到 KV Cache 的物理位置。
slot = block_id * block_size + offset_in_block
Returns:
slot_mapping: [num_tokens] 每个 token 对应的物理 slot 编号
"""
def create_fast_prefill_custom_backend(
attn_backend: AttentionBackend,
...
) -> AttentionBackend:
"""创建快速预填充后端。
对 prefill 阶段使用优化的注意力实现,
可能与 decode 阶段使用不同的后端。
"""
11.4 缓冲区工具 (gpu/buffer_utils.py)
文件: gpu/buffer_utils.py (217行)
"""
缓冲区工具 - 管理推理过程中的临时 GPU 缓冲区。
设计目的:
- 避免频繁的 cudaMalloc/cudaFree
- 预分配固定大小的缓冲区,运行时复用
- 支持 pinned memory 加速 CPU-GPU 传输
"""
class BufferManager:
"""缓冲区管理器 - 预分配和复用 GPU 缓冲区。"""
def __init__(self, device: torch.device, max_num_tokens: int):
# 预分配各种大小的缓冲区
self.input_ids_buffer = torch.zeros(
max_num_tokens, dtype=torch.int32, device=device
)
self.positions_buffer = torch.zeros(
max_num_tokens, dtype=torch.int32, device=device
)
11.5 异步工具 (gpu/async_utils.py)
文件: gpu/async_utils.py (120行)
"""
异步工具 - 支持推理过程中的异步操作。
核心场景:
- 异步输出拷贝: GPU-CPU 的非阻塞传输
- 异步调度: 调度和推理重叠执行
- 双缓冲: 当前步骤的输出和下一步骤的输入并行处理
"""
def create_async_copy_stream() -> torch.cuda.Stream:
"""创建异步拷贝 CUDA Stream。
用于 GPU-CPU 的非阻塞数据传输。
"""
return torch.cuda.Stream()
11.6 GPU 状态 (gpu/states.py)
文件: gpu/states.py (141行)
"""
GPU 状态管理 - 维护推理过程中的 GPU 状态。
包括:
- CUDA Graph 的输入/输出缓冲区状态
- LoRA 适配器状态
- 专家路由状态
"""
class GPUStates:
"""GPU 状态容器。"""
def __init__(self, device: torch.device):
self.device = device
# CUDA Graph 输入缓冲区
self.cudagraph_input_buffers: dict[int, torch.Tensor] = {}
# CUDA Graph 输出缓冲区
self.cudagraph_output_buffers: dict[int, torch.Tensor] = {}
11.7 预热逻辑 (gpu/warmup.py)
文件: gpu/warmup.py (153行)
"""
预热逻辑 - 在正式推理前执行预热操作。
预热目的:
1. 初始化 CUDA context(首次 CUDA 调用会触发 JIT 编译)
2. 预热 NCCL 通信(首次 AllReduce 会建立连接)
3. 触发 Torch 编译(如果启用了 torch.compile)
4. 捕获 CUDA Graph
"""
def warm_up_model(
model_runner,
...
) -> None:
"""执行模型预热。"""
11.8 结构化输出 (gpu/structured_outputs.py)
文件: gpu/structured_outputs.py (115行)
"""
结构化输出 - 支持语法约束的采样(JSON Schema、正则表达式等)。
实现方式:
- 使用 grammar bitmask 约束每个位置的合法 token
- 在采样时应用 bitmask,禁止非法 token
- 支持 xGrammar / LALG 等语法引擎
"""
def apply_grammar_bitmask(
logits: torch.Tensor,
grammar_bitmask: torch.Tensor,
...
) -> None:
"""将语法约束 bitmask 应用到 logits。
将非法 token 的 logit 设为 -inf,确保不会被采样到。
"""
11.9 关闭处理 (gpu/shutdown.py)
文件: gpu/shutdown.py (20行)
"""
关闭处理 - Worker 关闭时的清理操作。
释放 GPU 资源、关闭 NCCL 通信等。
"""
11.10 Pipeline Parallel 工具 (gpu/pp_utils.py)
文件: gpu/pp_utils.py (41行)
"""
Pipeline Parallel 工具 - 流水线并行的辅助函数。
处理:
- 流水线阶段的中间激活传递
- 微批次(micro-batch)的拆分和合并
"""
11.11 KV 连接器 (gpu/kv_connector.py)
文件: gpu/kv_connector.py (132行)
"""
KV 连接器 - 管理跨设备的 KV Cache 传输。
场景:
- 分布式推理中,KV Cache 需要在 GPU 间传输
- KV Cache 的预填充可能在不同设备完成
- 支持 KV Cache 的发送和接收操作
"""
class KVConnector:
"""KV 连接器 - 管理跨设备 KV Cache 传输。"""
def send_kv_blocks(
self,
block_ids: list[int],
...
) -> None:
"""发送 KV Cache 块到远程设备。"""
def recv_kv_blocks(
self,
block_ids: list[int],
...
) -> None:
"""从远程设备接收 KV Cache 块。"""
11.12 LoRA 工具 (gpu/lora_utils.py)
文件: gpu/lora_utils.py (44行)
"""
LoRA 工具 - LoRA 适配器的辅助函数。
LoRA (Low-Rank Adaptation) 允许在基座模型上动态加载
小型适配器权重,实现模型定制化而无需重新训练整个模型。
"""
11.13 Logits 指标 (gpu/metrics/logits.py)
文件: gpu/metrics/logits.py (42行)
"""
Logits 指标 - 记录和监控 logits 的统计信息。
用于调试和性能分析:
- logits 的最大值/最小值
- 采样前的温度分布
- 词汇表利用率
"""
附录 A: 关键数据结构速查表
| 数据结构 | 文件 | 用途 |
|---|---|---|
WorkerBase |
worker_base.py | Worker 抽象基类 |
GPUWorker |
gpu_worker.py | GPU 推理 Worker |
CPUWorker |
cpu_worker.py | CPU 推理 Worker |
XPUWorker |
xpu_worker.py | XPU 推理 Worker |
GPUModelRunner |
gpu_model_runner.py | GPU 模型执行器 |
BlockTable |
block_table.py | KV Cache 块表 |
GPUBlockTable |
gpu/block_table.py | GPU 优化块表 |
WorkspaceManager |
workspace.py | 工作空间管理 |
KVBlockZeroer |
utils.py | KV Cache 块清零 |
AttentionGroup |
utils.py | 注意力分组 |
CUDAGraphSizeBucket |
gpu/cudagraph_utils.py | CUDA Graph 大小桶 |
BufferManager |
gpu/buffer_utils.py | 缓冲区管理 |
ExecuteModelState |
gpu_model_runner.py | 执行模型状态 |
EncoderCudaGraphManager |
encoder_cudagraph.py | 编码器 CUDA Graph 管理器 |
KVConnector |
gpu/kv_connector.py | KV 连接器 |
GPUStates |
gpu/states.py | GPU 状态容器 |
附录 B: 关键方法调用链
GPUWorker.execute_model(scheduler_output)
├── WorkerBase.execute_model()
│ ├── model_runner.execute_model(scheduler_output)
│ │ ├── update_from_schedule(scheduler_output) # 更新批次状态
│ │ ├── _prepare_inputs() # 准备模型输入
│ │ │ ├── input_batch.update() # 更新输入批次
│ │ │ ├── attn_utils.get_seq_lens() # 获取序列长度
│ │ │ ├── attn_utils.get_slot_mapping() # 构建slot映射
│ │ │ └── attn_metadata_builder.build() # 构建注意力元数据
│ │ ├── model.forward(input_ids, positions, ...) # 模型前向传播
│ │ └── return ExecuteModelState / ModelRunnerOutput
│ └── model_runner.sample_tokens(state) # 采样
│ ├── sampler(logits, sampling_metadata) # 执行采样
│ └── return ModelRunnerOutput
├── sleep() / wake_up() # CPU offloading
└── check_health() # 健康检查
附录 C: 配置依赖关系
附录 D: KV Cache 内存布局
Part 1 结束 - 本文件覆盖了 Worker 层的架构总览、类层次体系、核心基类/实现、块表管理和 CUDA Graph 机制。
Part 2 将深入分析: 输入批处理系统(InputBatch)和采样管线(Sampler/Logprobs/Penalties)。
Part 3 将深入分析: GPUModelRunner(7174行核心)、推测解码、多模态编码、KV 连接器、LoRA、模型状态等。
更多推荐


所有评论(0)