获取 KV Cache 规格 (get_kv_cache_specs)

EngineCore._initialize_kv_caches
    │
    ├─> self.model_executor.get_kv_cache_specs()
    │        │
    │        └─> collective_rpc("get_kv_cache_spec")   [广播到所有 Worker]
    │                  │
    │                  ▼
    │            Worker.get_kv_cache_spec
    │                  │
    │                  ▼
    │            GPUModelRunner.get_kv_cache_spec
    │                  │
    │                  ├─> 若处于 EC 传输 consumer 模式 → 返回 {}
    │                  │
    │                  ├─> 获取所有 Attention 层 (get_layers_from_vllm_config)
    │                  │
    │                  ├─> 对每一层:
    │                  │      ├─> 若有 kv_sharing_target_layer_name
    │                  │      │      └─> 记录共享映射,跳过本层
    │                  │      └─> 否则调用 attn_module.get_kv_cache_spec
    │                  │               └─> 得到该层的规格 (dtype, shape, block_size 等)
    │                  │
    │                  └─> 返回 dict[layer_name, KVCacheSpec]
    │
    └─> 返回 list[dict[str, KVCacheSpec]]   (每个 Worker 一份)

collective_rpc的第一个参数是vllm.v1.worker.gpu_worker.Worker中对应的函数名称。

get_kv_cache_spec

先看一下,deepseek 闪电索引定义的kv cache spec

# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/model_executor/models/deepseek_v2.py#L580
class DeepseekV32IndexerCache(torch.nn.Module, AttentionLayerBase):
    def __init__(
        self, head_dim: int, dtype: torch.dtype, prefix: str, cache_config: CacheConfig
    ):
        super().__init__()
        self.kv_cache = torch.tensor([])
        self.head_dim = head_dim
        self.prefix = prefix
        self.cache_config = cache_config
        self.dtype = dtype
        compilation_config = get_current_vllm_config().compilation_config
        if prefix in compilation_config.static_forward_context:
            raise ValueError(f"Duplicate layer name: {prefix}")
        compilation_config.static_forward_context[prefix] = self

    def get_kv_cache_spec(self, vllm_config: VllmConfig) -> KVCacheSpec:
        return MLAAttentionSpec(  # Only has one vector instead of K + V
            block_size=self.cache_config.block_size,
            num_kv_heads=1,
            head_size=self.head_dim,
            dtype=self.dtype,
        )

    def forward(self): ...

    def get_attn_backend(self) -> AttentionBackend:
        return DeepseekV32IndexerBackend

生成全局 KV Cache 配置

EngineCore._initialize_kv_caches (续)
    │
    ├─> 获得 available_gpu_memory
    │
    ├─> get_kv_cache_configs(vllm_config, kv_cache_specs, available_gpu_memory)
    │        │
    │        ├─> 合并所有 Worker 的规格
    │        ├─> 根据可用显存计算总块数 (num_blocks)
    │        ├─> 决定内存布局:
    │        │      ├─> 哪些层可以共享同一个物理张量 (kv_cache_tensors)
    │        │      │      └─> 每个共享张量包含: size, shared_by (层名列表)
    │        │      └─> 每个 Worker 独立的 KVCacheConfig
    │        │
    │        └─> 返回 list[KVCacheConfig]   (每个 Worker 一个配置)
    │
    └─> (准备调用 initialize_from_config)

初始化 KV Cache (initialize_from_config)

EngineCore._initialize_kv_caches (续)
    │
    └─> self.model_executor.initialize_from_config(kv_cache_configs)
              │
              └─> collective_rpc("initialize_from_config", args=(kv_cache_configs,))
                        │
                        ▼
                  Worker.initialize_from_config(kv_cache_config)
                        │
                        ├─> 更新 self.cache_config.num_gpu_blocks
                        │
                        ├─> ensure_kv_transfer_initialized (初始化 KV 传输组件)
                        │
                        └─> self.model_runner.initialize_kv_cache(kv_cache_config)
                                  │
                                  ▼
                            GPUModelRunner.initialize_kv_cache
                                  │
                                  ├─> 调用 initialize_kv_cache_tensors()
                                  │         │
                                  │         ├─> 尝试 allocate_uniform_kv_caches (优化路径)
                                  │         │      └─> 直接得到 kv_caches 字典
                                  │         │
                                  │         └─> 否则走通用路径:
                                  │                 ├─> _allocate_kv_cache_tensors
                                  │                 │      └─> 根据 kv_cache_config.kv_cache_tensors
                                  │                 │          为每个共享张量创建 torch.zeros(int8)
                                  │                 │          得到 kv_cache_raw_tensors
                                  │                 │
                                  │                 └─> _reshape_kv_cache_tensors
                                  │                        └─> 重塑为各层所需形状
                                  │                             返回 kv_caches: dict[layer_name, tensor]
                                  │
                                  └─> bind_kv_cache(kv_caches, forward_context, runner_kv_caches)
                                            │
                                            ├─> 将 kv_caches 中的张量按层索引顺序填入 runner_kv_caches 列表
                                            │
                                            └─> 对每一层: forward_context[layer_name].kv_cache = kv_caches[layer_name]

主要代码

EngineCore

class EngineCore:
# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/engine/core.py#L128
    def _initialize_kv_caches(self, vllm_config: VllmConfig) -> KVCacheConfig:
        # Get all kv cache needed by the model
        kv_cache_specs = self.model_executor.get_kv_cache_specs()

        kv_cache_configs = get_kv_cache_configs(
            vllm_config, kv_cache_specs, available_gpu_memory
        )
        # Initialize kv cache and warmup the execution
        self.model_executor.initialize_from_config(kv_cache_configs)

model_executor的函数调用

class Executor(ABC)
# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/executor/abstract.py#L149
    def get_kv_cache_specs(self) -> list[dict[str, KVCacheSpec]]:
        return self.collective_rpc("get_kv_cache_spec")

# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/executor/abstract.py#L118
    def initialize_from_config(self, kv_cache_configs: list[KVCacheConfig]) -> None:
        """
        Initialize the KV caches and begin the model execution loop of the
        underlying workers.
        """
        self.collective_rpc("initialize_from_config", args=(kv_cache_configs,))

collective_rpc的第一个参数是vllm.v1.worker.gpu_worker.Worker中对应的函数名称。

Worker

class Worker(WorkerBase):
# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/worker/gpu_worker.py#L501
    def get_kv_cache_spec(self) -> dict[str, KVCacheSpec]:
        return self.model_runner.get_kv_cache_spec()

# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/worker/gpu_worker.py#L517
    def initialize_from_config(self, kv_cache_config: KVCacheConfig) -> None:
        """Allocate GPU KV cache with the specified kv_cache_config."""

        # Update local config with adjusted num blocks after profiling,
        # so that it's available to the warmup stage.
        self.cache_config.num_gpu_blocks = kv_cache_config.num_blocks

        # Init kv cache connector here, because it requires
        # `kv_cache_config`.
        # NOTE(Kuntai): This need to be done before `initialize_kv_cache`,
        # because `initialize_kv_cache` will inject kv cache groups not
        # related to kv cache connector (e.g. kv cache sharing layers).
        ensure_kv_transfer_initialized(self.vllm_config, kv_cache_config)

        if self.vllm_config.model_config.enable_sleep_mode:
            from vllm.device_allocator.cumem import CuMemAllocator

            allocator = CuMemAllocator.get_instance()
            with allocator.use_memory_pool(tag="kv_cache"):
                self.model_runner.initialize_kv_cache(kv_cache_config)
        else:
            self.model_runner.initialize_kv_cache(kv_cache_config)

model_runner

class GPUModelRunner(
    LoRAModelRunnerMixin, KVConnectorModelRunnerMixin, ECConnectorModelRunnerMixin
):
# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/worker/gpu_model_runner.py#L6522
    def _allocate_kv_cache_tensors(
        self, kv_cache_config: KVCacheConfig
    ) -> dict[str, torch.Tensor]:
        """
        Initializes the KV cache buffer with the correct size. The buffer needs
        to be reshaped to the desired shape before being used by the models.

        Args:
            kv_cache_config: The KV cache config
        Returns:
            dict[str, torch.Tensor]: A map between layer names to their
            corresponding memory buffer for KV cache.
        """
        kv_cache_raw_tensors: dict[str, torch.Tensor] = {}
        for kv_cache_tensor in kv_cache_config.kv_cache_tensors:
            tensor = torch.zeros(
                kv_cache_tensor.size, dtype=torch.int8, device=self.device
            )
            for layer_name in kv_cache_tensor.shared_by:
                kv_cache_raw_tensors[layer_name] = tensor

        layer_names = set()
        for group in kv_cache_config.kv_cache_groups:
            for layer_name in group.layer_names:
                if layer_name in self.runner_only_attn_layers:
                    continue
                layer_names.add(layer_name)
        assert layer_names == set(kv_cache_raw_tensors.keys()), (
            "Some layers are not correctly initialized"
        )
        return kv_cache_raw_tensors

# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/worker/gpu_model_runner.py#L6724    
    def initialize_kv_cache_tensors(
        self, kv_cache_config: KVCacheConfig, kernel_block_sizes: list[int]
    ) -> dict[str, torch.Tensor]:
        # Try creating KV caches optimized for kv-connector transfers
        cache_dtype = self.cache_config.cache_dtype
        if self.use_uniform_kv_cache(self.attn_groups, cache_dtype):
            kv_caches, cross_layers_kv_cache, attn_backend = (
                self.allocate_uniform_kv_caches(
                    kv_cache_config,
                    self.attn_groups,
                    cache_dtype,
                    self.device,
                    kernel_block_sizes,
                )
            )
            self.cross_layers_kv_cache = cross_layers_kv_cache
            self.cross_layers_attn_backend = attn_backend
        else:
            # Fallback to the general case
            # Initialize the memory buffer for KV cache
            kv_cache_raw_tensors = self._allocate_kv_cache_tensors(kv_cache_config)

            # Change the memory buffer to the desired shape
            kv_caches = self._reshape_kv_cache_tensors(
                kv_cache_raw_tensors, kernel_block_sizes


        bind_kv_cache(
            kv_caches,
            self.compilation_config.static_forward_context,
            self.kv_caches,
            num_attn_module,
        )
        return kv_caches

# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/worker/gpu_model_runner.py#L6807
    def initialize_kv_cache(
        self,
        kv_cache_config: KVCacheConfig,
        is_profiling: bool = False,
    ) -> None:
        kv_caches = self.initialize_kv_cache_tensors(
            kv_cache_config, kernel_block_sizes
        )

# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/worker/gpu_model_runner.py#L6944
    def get_kv_cache_spec(self) -> dict[str, KVCacheSpec]:
        """
        Generates the KVCacheSpec by parsing the kv cache format from each
        Attention module in the static forward context.
        Returns:
            KVCacheSpec: A dictionary mapping layer names to their KV cache
            format. Layers that do not need KV cache are not included.
        """
        if has_ec_transfer() and not get_ec_transfer().is_consumer:
            return {}
        kv_cache_spec: dict[str, KVCacheSpec] = {}
        layer_type = cast(type[Any], AttentionLayerBase)
        attn_layers = get_layers_from_vllm_config(self.vllm_config, layer_type)
        for layer_name, attn_module in attn_layers.items():
            if isinstance(attn_module, Attention) and (
                kv_tgt_layer := attn_module.kv_sharing_target_layer_name
            ):
                # The layer doesn't need its own KV cache and will use that of
                # the target layer. We skip creating a KVCacheSpec for it, so
                # that KV cache management logic will act as this layer does
                # not exist, and doesn't allocate KV cache for the layer. This
                # enables the memory saving of cross-layer kv sharing, allowing
                # a given amount of memory to accommodate longer context lengths
                # or enable more requests to be processed simultaneously.
                self.shared_kv_cache_layers[layer_name] = kv_tgt_layer
                continue
            # Skip modules that don't need KV cache (eg encoder-only attention)
            if spec := attn_module.get_kv_cache_spec(self.vllm_config):
                kv_cache_spec[layer_name] = spec

        return kv_cache_spec

# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/worker/utils.py#L457
def bind_kv_cache(
    kv_caches: dict[str, torch.Tensor],
    forward_context: dict[str, Attention],
    runner_kv_caches: list[torch.Tensor],
    num_attn_module: int = 1,
) -> None:
    """
    Bind the allocated KV cache to both ModelRunner and forward context so
    that the KV cache can be used in the forward pass.

    This function:
      1) Fills the ModelRunner's kv cache list (`runner_kv_caches`) with
         kv_caches.
      2) Associates each attention layer in the `forward_context` with its
         corresponding KV cache in kv_caches.

    Args:
        kv_caches: The allocated kv_caches with layer names as keys.
        forward_context: The global forward context containing all Attention
            layers with layer names as keys.
        runner_kv_caches: The kv_cache declared by ModelRunner.
    """
    # Bind kv_caches to ModelRunner
    assert len(runner_kv_caches) == 0

    # Convert kv_caches dict to a list of tensors in the order of layer_index.
    index2name = defaultdict(list)
    for layer_name in kv_caches:
        index2name[extract_layer_index(layer_name, num_attn_module)].append(layer_name)

    for layer_index in sorted(index2name.keys()):
        layer_names = index2name[layer_index]
        if len(layer_names) > 1:
            # One typical case is encoder-decoder model, e.g., bart.
            # The cross attention and self attention in the same decoder layer
            # has different layer_name but the same layer_index.

            # TODO - analyze where runner_kv_caches is used and the right
            # way to ensure it properly reflects multiple attention layers
            # in the same decoder block.
            if (
                current_platform.is_cuda_alike()
                or current_platform.is_xpu()
                or current_platform.is_cpu()
            ):
                # We know that the GPU / CPU runner is not impacted by this
                # case. Some test code depends on runner_kv_caches, but
                # not in a way that's impacted by ignoring this.
                pass
            else:
                raise NotImplementedError
        for layer_name in layer_names:
            runner_kv_caches.append(kv_caches[layer_name])

    # Bind kv_caches to forward context
    for layer_name, kv_cache in kv_caches.items():
        forward_context[layer_name].kv_cache = kv_cache  

_allocate_kv_cache_tensors 处理流程图

开始:_allocate_kv_cache_tensors(kv_cache_config)
   │
   ├─> 输入参数:kv_cache_config (包含 kv_cache_tensors 和 kv_cache_groups)
   │
   ├─> 步骤 1:分配共享张量
   │       │
   │       ├─> 初始化空字典 kv_cache_raw_tensors = {}
   │       │
   │       └─> 遍历 kv_cache_config.kv_cache_tensors:
   │               │
   │               ├─> 对每个 kv_cache_tensor:
   │               │       ├─> tensor = torch.zeros(size, dtype=torch.int8, device=self.device)
   │               │       │       └─> 在 GPU(或指定设备)上分配全零的 int8 字节缓冲区
   │               │       │
   │               │       └─> 遍历 kv_cache_tensor.shared_by(共享该张量的所有层名):
   │               │               └─> kv_cache_raw_tensors[layer_name] = tensor
   │               │                     (多个层名指向同一个 tensor 对象 → 内存共享)
   │               │
   │               └─> 继续下一个 kv_cache_tensor
   │
   ├─> 步骤 2:验证层完整性
   │       │
   │       ├─> 收集期望的层名集合 layer_names = set()
   │       │       │
   │       │       └─> 遍历 kv_cache_config.kv_cache_groups:
   │       │               └─> 遍历 group.layer_names:
   │       │                       └─> 如果 layer_name 不在 self.runner_only_attn_layers 中:
   │       │                               └─> layer_names.add(layer_name)
   │       │
   │       ├─> 获取实际分配了内存的层名集合 set(kv_cache_raw_tensors.keys())
   │       │
   │       └─> 断言 layer_names == 实际分配层名集合
   │               └─> 若不一致 → 抛出 AssertionError,说明某些层未正确分配
   │
   ├─> 步骤 3:返回结果
   │       │
   │       └─> 返回 kv_cache_raw_tensors (dict[layer_name, torch.Tensor])
   │               └─> 注意:此时张量仍是未整形的 int8 原始缓冲区
   │
结束

reference

[1] 图解Vllm V1系列3:KV Cache初始化

Logo

免费领 100 小时云算力,进群参与显卡、AI PC 幸运抽奖

更多推荐