vllm分析(四)——kv cache的初始化
分析vllmkv cache的初始化流程,Tensor的分配和赋值。
·
获取 KV Cache 规格 (get_kv_cache_specs)
EngineCore._initialize_kv_caches
│
├─> self.model_executor.get_kv_cache_specs()
│ │
│ └─> collective_rpc("get_kv_cache_spec") [广播到所有 Worker]
│ │
│ ▼
│ Worker.get_kv_cache_spec
│ │
│ ▼
│ GPUModelRunner.get_kv_cache_spec
│ │
│ ├─> 若处于 EC 传输 consumer 模式 → 返回 {}
│ │
│ ├─> 获取所有 Attention 层 (get_layers_from_vllm_config)
│ │
│ ├─> 对每一层:
│ │ ├─> 若有 kv_sharing_target_layer_name
│ │ │ └─> 记录共享映射,跳过本层
│ │ └─> 否则调用 attn_module.get_kv_cache_spec
│ │ └─> 得到该层的规格 (dtype, shape, block_size 等)
│ │
│ └─> 返回 dict[layer_name, KVCacheSpec]
│
└─> 返回 list[dict[str, KVCacheSpec]] (每个 Worker 一份)
collective_rpc的第一个参数是vllm.v1.worker.gpu_worker.Worker中对应的函数名称。
get_kv_cache_spec
先看一下,deepseek 闪电索引定义的kv cache spec
# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/model_executor/models/deepseek_v2.py#L580
class DeepseekV32IndexerCache(torch.nn.Module, AttentionLayerBase):
def __init__(
self, head_dim: int, dtype: torch.dtype, prefix: str, cache_config: CacheConfig
):
super().__init__()
self.kv_cache = torch.tensor([])
self.head_dim = head_dim
self.prefix = prefix
self.cache_config = cache_config
self.dtype = dtype
compilation_config = get_current_vllm_config().compilation_config
if prefix in compilation_config.static_forward_context:
raise ValueError(f"Duplicate layer name: {prefix}")
compilation_config.static_forward_context[prefix] = self
def get_kv_cache_spec(self, vllm_config: VllmConfig) -> KVCacheSpec:
return MLAAttentionSpec( # Only has one vector instead of K + V
block_size=self.cache_config.block_size,
num_kv_heads=1,
head_size=self.head_dim,
dtype=self.dtype,
)
def forward(self): ...
def get_attn_backend(self) -> AttentionBackend:
return DeepseekV32IndexerBackend
生成全局 KV Cache 配置
EngineCore._initialize_kv_caches (续)
│
├─> 获得 available_gpu_memory
│
├─> get_kv_cache_configs(vllm_config, kv_cache_specs, available_gpu_memory)
│ │
│ ├─> 合并所有 Worker 的规格
│ ├─> 根据可用显存计算总块数 (num_blocks)
│ ├─> 决定内存布局:
│ │ ├─> 哪些层可以共享同一个物理张量 (kv_cache_tensors)
│ │ │ └─> 每个共享张量包含: size, shared_by (层名列表)
│ │ └─> 每个 Worker 独立的 KVCacheConfig
│ │
│ └─> 返回 list[KVCacheConfig] (每个 Worker 一个配置)
│
└─> (准备调用 initialize_from_config)
初始化 KV Cache (initialize_from_config)
EngineCore._initialize_kv_caches (续)
│
└─> self.model_executor.initialize_from_config(kv_cache_configs)
│
└─> collective_rpc("initialize_from_config", args=(kv_cache_configs,))
│
▼
Worker.initialize_from_config(kv_cache_config)
│
├─> 更新 self.cache_config.num_gpu_blocks
│
├─> ensure_kv_transfer_initialized (初始化 KV 传输组件)
│
└─> self.model_runner.initialize_kv_cache(kv_cache_config)
│
▼
GPUModelRunner.initialize_kv_cache
│
├─> 调用 initialize_kv_cache_tensors()
│ │
│ ├─> 尝试 allocate_uniform_kv_caches (优化路径)
│ │ └─> 直接得到 kv_caches 字典
│ │
│ └─> 否则走通用路径:
│ ├─> _allocate_kv_cache_tensors
│ │ └─> 根据 kv_cache_config.kv_cache_tensors
│ │ 为每个共享张量创建 torch.zeros(int8)
│ │ 得到 kv_cache_raw_tensors
│ │
│ └─> _reshape_kv_cache_tensors
│ └─> 重塑为各层所需形状
│ 返回 kv_caches: dict[layer_name, tensor]
│
└─> bind_kv_cache(kv_caches, forward_context, runner_kv_caches)
│
├─> 将 kv_caches 中的张量按层索引顺序填入 runner_kv_caches 列表
│
└─> 对每一层: forward_context[layer_name].kv_cache = kv_caches[layer_name]
主要代码
EngineCore
class EngineCore:
# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/engine/core.py#L128
def _initialize_kv_caches(self, vllm_config: VllmConfig) -> KVCacheConfig:
# Get all kv cache needed by the model
kv_cache_specs = self.model_executor.get_kv_cache_specs()
kv_cache_configs = get_kv_cache_configs(
vllm_config, kv_cache_specs, available_gpu_memory
)
# Initialize kv cache and warmup the execution
self.model_executor.initialize_from_config(kv_cache_configs)
model_executor的函数调用
class Executor(ABC)
# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/executor/abstract.py#L149
def get_kv_cache_specs(self) -> list[dict[str, KVCacheSpec]]:
return self.collective_rpc("get_kv_cache_spec")
# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/executor/abstract.py#L118
def initialize_from_config(self, kv_cache_configs: list[KVCacheConfig]) -> None:
"""
Initialize the KV caches and begin the model execution loop of the
underlying workers.
"""
self.collective_rpc("initialize_from_config", args=(kv_cache_configs,))
collective_rpc的第一个参数是vllm.v1.worker.gpu_worker.Worker中对应的函数名称。
Worker
class Worker(WorkerBase):
# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/worker/gpu_worker.py#L501
def get_kv_cache_spec(self) -> dict[str, KVCacheSpec]:
return self.model_runner.get_kv_cache_spec()
# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/worker/gpu_worker.py#L517
def initialize_from_config(self, kv_cache_config: KVCacheConfig) -> None:
"""Allocate GPU KV cache with the specified kv_cache_config."""
# Update local config with adjusted num blocks after profiling,
# so that it's available to the warmup stage.
self.cache_config.num_gpu_blocks = kv_cache_config.num_blocks
# Init kv cache connector here, because it requires
# `kv_cache_config`.
# NOTE(Kuntai): This need to be done before `initialize_kv_cache`,
# because `initialize_kv_cache` will inject kv cache groups not
# related to kv cache connector (e.g. kv cache sharing layers).
ensure_kv_transfer_initialized(self.vllm_config, kv_cache_config)
if self.vllm_config.model_config.enable_sleep_mode:
from vllm.device_allocator.cumem import CuMemAllocator
allocator = CuMemAllocator.get_instance()
with allocator.use_memory_pool(tag="kv_cache"):
self.model_runner.initialize_kv_cache(kv_cache_config)
else:
self.model_runner.initialize_kv_cache(kv_cache_config)
model_runner
class GPUModelRunner(
LoRAModelRunnerMixin, KVConnectorModelRunnerMixin, ECConnectorModelRunnerMixin
):
# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/worker/gpu_model_runner.py#L6522
def _allocate_kv_cache_tensors(
self, kv_cache_config: KVCacheConfig
) -> dict[str, torch.Tensor]:
"""
Initializes the KV cache buffer with the correct size. The buffer needs
to be reshaped to the desired shape before being used by the models.
Args:
kv_cache_config: The KV cache config
Returns:
dict[str, torch.Tensor]: A map between layer names to their
corresponding memory buffer for KV cache.
"""
kv_cache_raw_tensors: dict[str, torch.Tensor] = {}
for kv_cache_tensor in kv_cache_config.kv_cache_tensors:
tensor = torch.zeros(
kv_cache_tensor.size, dtype=torch.int8, device=self.device
)
for layer_name in kv_cache_tensor.shared_by:
kv_cache_raw_tensors[layer_name] = tensor
layer_names = set()
for group in kv_cache_config.kv_cache_groups:
for layer_name in group.layer_names:
if layer_name in self.runner_only_attn_layers:
continue
layer_names.add(layer_name)
assert layer_names == set(kv_cache_raw_tensors.keys()), (
"Some layers are not correctly initialized"
)
return kv_cache_raw_tensors
# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/worker/gpu_model_runner.py#L6724
def initialize_kv_cache_tensors(
self, kv_cache_config: KVCacheConfig, kernel_block_sizes: list[int]
) -> dict[str, torch.Tensor]:
# Try creating KV caches optimized for kv-connector transfers
cache_dtype = self.cache_config.cache_dtype
if self.use_uniform_kv_cache(self.attn_groups, cache_dtype):
kv_caches, cross_layers_kv_cache, attn_backend = (
self.allocate_uniform_kv_caches(
kv_cache_config,
self.attn_groups,
cache_dtype,
self.device,
kernel_block_sizes,
)
)
self.cross_layers_kv_cache = cross_layers_kv_cache
self.cross_layers_attn_backend = attn_backend
else:
# Fallback to the general case
# Initialize the memory buffer for KV cache
kv_cache_raw_tensors = self._allocate_kv_cache_tensors(kv_cache_config)
# Change the memory buffer to the desired shape
kv_caches = self._reshape_kv_cache_tensors(
kv_cache_raw_tensors, kernel_block_sizes
bind_kv_cache(
kv_caches,
self.compilation_config.static_forward_context,
self.kv_caches,
num_attn_module,
)
return kv_caches
# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/worker/gpu_model_runner.py#L6807
def initialize_kv_cache(
self,
kv_cache_config: KVCacheConfig,
is_profiling: bool = False,
) -> None:
kv_caches = self.initialize_kv_cache_tensors(
kv_cache_config, kernel_block_sizes
)
# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/worker/gpu_model_runner.py#L6944
def get_kv_cache_spec(self) -> dict[str, KVCacheSpec]:
"""
Generates the KVCacheSpec by parsing the kv cache format from each
Attention module in the static forward context.
Returns:
KVCacheSpec: A dictionary mapping layer names to their KV cache
format. Layers that do not need KV cache are not included.
"""
if has_ec_transfer() and not get_ec_transfer().is_consumer:
return {}
kv_cache_spec: dict[str, KVCacheSpec] = {}
layer_type = cast(type[Any], AttentionLayerBase)
attn_layers = get_layers_from_vllm_config(self.vllm_config, layer_type)
for layer_name, attn_module in attn_layers.items():
if isinstance(attn_module, Attention) and (
kv_tgt_layer := attn_module.kv_sharing_target_layer_name
):
# The layer doesn't need its own KV cache and will use that of
# the target layer. We skip creating a KVCacheSpec for it, so
# that KV cache management logic will act as this layer does
# not exist, and doesn't allocate KV cache for the layer. This
# enables the memory saving of cross-layer kv sharing, allowing
# a given amount of memory to accommodate longer context lengths
# or enable more requests to be processed simultaneously.
self.shared_kv_cache_layers[layer_name] = kv_tgt_layer
continue
# Skip modules that don't need KV cache (eg encoder-only attention)
if spec := attn_module.get_kv_cache_spec(self.vllm_config):
kv_cache_spec[layer_name] = spec
return kv_cache_spec
# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/worker/utils.py#L457
def bind_kv_cache(
kv_caches: dict[str, torch.Tensor],
forward_context: dict[str, Attention],
runner_kv_caches: list[torch.Tensor],
num_attn_module: int = 1,
) -> None:
"""
Bind the allocated KV cache to both ModelRunner and forward context so
that the KV cache can be used in the forward pass.
This function:
1) Fills the ModelRunner's kv cache list (`runner_kv_caches`) with
kv_caches.
2) Associates each attention layer in the `forward_context` with its
corresponding KV cache in kv_caches.
Args:
kv_caches: The allocated kv_caches with layer names as keys.
forward_context: The global forward context containing all Attention
layers with layer names as keys.
runner_kv_caches: The kv_cache declared by ModelRunner.
"""
# Bind kv_caches to ModelRunner
assert len(runner_kv_caches) == 0
# Convert kv_caches dict to a list of tensors in the order of layer_index.
index2name = defaultdict(list)
for layer_name in kv_caches:
index2name[extract_layer_index(layer_name, num_attn_module)].append(layer_name)
for layer_index in sorted(index2name.keys()):
layer_names = index2name[layer_index]
if len(layer_names) > 1:
# One typical case is encoder-decoder model, e.g., bart.
# The cross attention and self attention in the same decoder layer
# has different layer_name but the same layer_index.
# TODO - analyze where runner_kv_caches is used and the right
# way to ensure it properly reflects multiple attention layers
# in the same decoder block.
if (
current_platform.is_cuda_alike()
or current_platform.is_xpu()
or current_platform.is_cpu()
):
# We know that the GPU / CPU runner is not impacted by this
# case. Some test code depends on runner_kv_caches, but
# not in a way that's impacted by ignoring this.
pass
else:
raise NotImplementedError
for layer_name in layer_names:
runner_kv_caches.append(kv_caches[layer_name])
# Bind kv_caches to forward context
for layer_name, kv_cache in kv_caches.items():
forward_context[layer_name].kv_cache = kv_cache
_allocate_kv_cache_tensors 处理流程图
开始:_allocate_kv_cache_tensors(kv_cache_config)
│
├─> 输入参数:kv_cache_config (包含 kv_cache_tensors 和 kv_cache_groups)
│
├─> 步骤 1:分配共享张量
│ │
│ ├─> 初始化空字典 kv_cache_raw_tensors = {}
│ │
│ └─> 遍历 kv_cache_config.kv_cache_tensors:
│ │
│ ├─> 对每个 kv_cache_tensor:
│ │ ├─> tensor = torch.zeros(size, dtype=torch.int8, device=self.device)
│ │ │ └─> 在 GPU(或指定设备)上分配全零的 int8 字节缓冲区
│ │ │
│ │ └─> 遍历 kv_cache_tensor.shared_by(共享该张量的所有层名):
│ │ └─> kv_cache_raw_tensors[layer_name] = tensor
│ │ (多个层名指向同一个 tensor 对象 → 内存共享)
│ │
│ └─> 继续下一个 kv_cache_tensor
│
├─> 步骤 2:验证层完整性
│ │
│ ├─> 收集期望的层名集合 layer_names = set()
│ │ │
│ │ └─> 遍历 kv_cache_config.kv_cache_groups:
│ │ └─> 遍历 group.layer_names:
│ │ └─> 如果 layer_name 不在 self.runner_only_attn_layers 中:
│ │ └─> layer_names.add(layer_name)
│ │
│ ├─> 获取实际分配了内存的层名集合 set(kv_cache_raw_tensors.keys())
│ │
│ └─> 断言 layer_names == 实际分配层名集合
│ └─> 若不一致 → 抛出 AssertionError,说明某些层未正确分配
│
├─> 步骤 3:返回结果
│ │
│ └─> 返回 kv_cache_raw_tensors (dict[layer_name, torch.Tensor])
│ └─> 注意:此时张量仍是未整形的 int8 原始缓冲区
│
结束
reference
更多推荐

所有评论(0)