vLLM v1 KV Offload 模块 — 超深度架构分析(二):核心业务逻辑逐行解析

分析基于源码目录:github.com/vllm/vllm/v1/kv_offload


图 3:初始化流程图

Yes

No

Yes

No

Scheduler

No

Yes

No

Yes

Worker

No

No

Yes

Yes

No

系统启动

加载 VllmConfig
读取 kv_transfer_config

OffloadingSpecFactory.create_spec()

spec_name
在注册表中?

_registry[spec_name]()
延迟加载模块

spec_module_path
已配置?

importlib.import_module(path)
动态加载

ValueError:
Unsupported spec type

new CPUOffloadingSpec(config, kv_cache_config)

计算 CPU 块数:
num_blocks = cpu_bytes_to_use / kv_bytes_per_offloaded_block

OffloadingSpec 实例就绪

Scheduler / Worker 分支

spec.get_manager()

_manager
已创建?

new CPUOffloadingManager(num_blocks, policy)

store_threshold >= 2?

FilterReusedOffloadingManager(backing=manager)

Manager 就绪

spec.get_handlers(kv_caches)

_handlers
已创建?

is_cuda_alike()?

Exception:
Only CUDA supported

new CpuGpuOffloadingHandlers()

分配 CPU 张量
(mmap 或 torch.zeros)

mmap_region
已提供?

mmap_region.create_next_view()

torch.zeros(pin_memory=True)

pin_mmap_region()

创建两个 SingleDirectionOffloadingHandler
gpu_to_cpu (GPU→CPU)
cpu_to_gpu (CPU→GPU)

yield (GPULoadStoreSpec, CPULoadStoreSpec, gpu_to_cpu_handler)

yield (CPULoadStoreSpec, GPULoadStoreSpec, cpu_to_gpu_handler)

OffloadingWorker.register_handler()

系统就绪


第一章 base.py — 核心抽象基类与数据结构

1.1 OffloadKey — 卸载块的唯一标识

# 第24-27行:OffloadKey 类型定义
# `OffloadKey` identifies an offloaded block. It combines a block hash with
# its KV cache group index, encoded as raw bytes to avoid tuple GC overhead.
# Use the helper functions below to construct / decompose keys.
OffloadKey = NewType("OffloadKey", bytes)

逐行解析:

  • OffloadKeybytes 的 NewType 别名,不是独立的类
  • 设计意图:避免 Python tuple 的 GC 开销。如果用 (block_hash, group_idx) 元组,每次创建都会触发 Python 对象分配和 GC 追踪;而 bytes 是不可变序列,分配更高效
  • 包含两部分信息:block_hash(块的哈希值,用于前缀缓存匹配)+ group_idx(KV 缓存组索引,支持混合模型如 Mamba+Attention)
# 第32-44行:OffloadKey 的构造与分解函数
def make_offload_key(block_hash: bytes, group_idx: int) -> OffloadKey:
    """Pack a block hash and group index into an `OffloadKey`."""
    # 将 group_idx 编码为 4 字节大端无符号整数,拼接到 block_hash 后面
    # 大端序保证跨平台一致性;4 字节足够容纳任何合理的 group_idx
    return OffloadKey(block_hash + group_idx.to_bytes(4, "big", signed=False))

def get_offload_block_hash(key: OffloadKey) -> bytes:
    """Extract the block hash from an `OffloadKey`."""
    # 最后 4 字节是 group_idx,前面的都是 block_hash
    return key[:-4]

def get_offload_group_idx(key: OffloadKey) -> int:
    """Extract the group index from an `OffloadKey`."""
    # 解码最后 4 字节为大端无符号整数
    return int.from_bytes(key[-4:], "big", signed=False)

内存布局:

OffloadKey (bytes)
┌──────────────────────────────┬──────────────┐
│      block_hash (变长)        │  group_idx   │
│  N bytes (前缀缓存哈希)       │  4 bytes     │
│                              │  (大端序)     │
└──────────────────────────────┴──────────────┘
                               ↑ key[-4:]
               key[:-4] ───────┘

1.2 ReqContext — 请求上下文

# 第47-49行
@dataclass
class ReqContext:
    kv_transfer_params: dict[str, Any] | None = None
    # 每请求级别的 KV 传输参数(如分布式推理中的节点间传输参数)
    # 可选字段,None 表示无特殊传输需求

1.3 LoadStoreSpec — 加载/存储规格抽象

# 第52-65行
class LoadStoreSpec(ABC):
    """
    Abstract metadata that encapsulates information allowing a worker
    to load, and optionally also to store, blocks of KV data.
    """
    @staticmethod
    @abstractmethod
    def medium() -> str:
        """
        Returns a string representation of the medium type
        this store/load targets.
        """
        pass
        # 静态方法,返回存储介质类型标识
        # GPU → "GPU", CPU → "CPU"
        # 用于 OffloadingWorker 路由传输到正确的 Handler

设计意图: LoadStoreSpec 是连接 Manager(控制面)和 Handler(数据面)的关键数据结构。Manager 决定"哪些块需要搬移"后,将块的 ID 封装为 LoadStoreSpec,传递给 Handler 执行实际的 DMA 传输。

1.4 PrepareStoreOutput — 存储准备结果

# 第68-72行
@dataclass
class PrepareStoreOutput:
    keys_to_store: list[OffloadKey]     # 需要存储的块键列表(已过滤掉已存在的块)
    store_spec: LoadStoreSpec           # 存储规格(告诉 Worker 写到哪些 CPU 块)
    evicted_keys: list[OffloadKey]      # 因空间不足被驱逐的块键列表

1.5 OffloadingEvent — 卸载事件

# 第75-80行
@dataclass
class OffloadingEvent:
    keys: list[OffloadKey]    # 涉及的块键
    medium: str               # 介质类型("CPU")
    removed: bool             # True=块被驱逐(删除),False=块被存储(新增)

1.6 OffloadingManager — 核心管理器抽象

# 第110-217行:完整的 OffloadingManager 抽象类
class OffloadingManager(ABC):
    @abstractmethod
    def lookup(self, key: OffloadKey, req_context: ReqContext) -> bool | None:
        """
        Checks whether a single block is offloaded and ready to be read.
        
        返回值三态逻辑:
        - True: 块已卸载且可读(数据就绪)
        - False: 块未卸载(不在缓存中)
        - None: 块已卸载但正在写入中,调用者应稍后重试
                返回 None 会导致 Scheduler 延迟该请求的处理
        """
        pass

    @abstractmethod
    def prepare_load(
        self,
        keys: Collection[OffloadKey],
        req_context: ReqContext,
    ) -> LoadStoreSpec:
        """
        准备加载操作:
        1. 假设所有给定的 keys 都已卸载(调用者应先 lookup 确认)
        2. 对每个块增加引用计数,防止加载期间被驱逐
        3. 返回 LoadStoreSpec 告诉 Worker 从哪些 CPU 块读取
        
        注意:调用者必须在加载完成后调用 complete_load 释放引用计数
        """
        pass

    def touch(self, keys: Collection[OffloadKey]):
        """标记块为最近使用(默认空实现,策略层覆盖)"""
        return

    def complete_load(self, keys: Collection[OffloadKey]):
        """加载完成,释放引用计数(默认空实现,子类覆盖)"""
        return

    @abstractmethod
    def prepare_store(
        self,
        keys: Collection[OffloadKey],
        req_context: ReqContext,
    ) -> PrepareStoreOutput | None:
        """
        准备存储操作:
        1. 过滤掉已存储的块
        2. 检查空间是否足够,不足则触发驱逐
        3. 分配 CPU 块,注册到策略
        4. 返回 PrepareStoreOutput(含存储规格和驱逐列表)
        5. None 表示无法存储(空间不足且无法驱逐)
        """
        pass

    def complete_store(self, keys: Collection[OffloadKey], success: bool = True):
        """
        存储完成:
        - success=True: 标记块为 is_ready=True(可供后续 lookup 命中)
        - success=False: 从策略中移除块,回收 CPU 块
        """
        return

    def take_events(self) -> Iterable[OffloadingEvent]:
        """获取事件流(默认空迭代器)"""
        return ()

    def shutdown(self) -> None:
        """关闭管理器,释放资源(默认空实现)"""
        return

1.7 BlockIDsLoadStoreSpec 与 GPULoadStoreSpec

# 第219-267行
class BlockIDsLoadStoreSpec(LoadStoreSpec, ABC):
    """基于块 ID 的 LoadStoreSpec 基类"""
    def __init__(self, block_ids: list[int]):
        # 使用 numpy int64 数组存储,而不是 Python list
        # 原因:后续需要传给 compute_sub_block_ptrs 进行向量化的指针计算
        self.block_ids = np.array(block_ids, dtype=np.int64)

class GPULoadStoreSpec(BlockIDsLoadStoreSpec):
    """
    GPU 侧加载/存储规格。
    
    核心问题:当 CPU 卸载块大小 ≠ GPU 块大小时(block_size_factor > 1),
    一个 CPU 块对应多个 GPU 子块。需要额外信息来正确映射。
    
    三个关键字段:
    """
    def __init__(
        self,
        block_ids: list[int],         # GPU 块 ID 列表
        group_sizes: Sequence[int],   # 每个 KV 组的 GPU 块数量
        block_indices: Sequence[int], # 每个组内第一个块的逻辑索引
    ):
        super().__init__(block_ids)
        # 约束校验
        assert sum(group_sizes) == len(block_ids)  # 组大小之和等于总块数
        assert len(block_indices) == len(group_sizes)  # 组数一致
        self.group_sizes: Sequence[int] = group_sizes
        self.block_indices: Sequence[int] = block_indices
        # block_indices 的用途:
        # 当 block_size_factor > 1 时,请求的第一个 GPU 块可能不是 CPU 块的起始位置
        # block_indices[i] % block_size_factor = 该组首块在 CPU 块内的偏移(子块数)
        # 这决定了 DMA 传输时需要跳过多少数据

GPU/CPU 块映射关系:

GPU 块(小块,block_size = 16 tokens)
┌────┬────┬────┬────┬────┬────┐
│ G0 │ G1 │ G2 │ G3 │ G4 │ G5 │  ← 6 个 GPU 块
└────┴────┴────┴────┴────┴────┘
  │    │    │    │    │    │
  ▼    ▼    ▼    ▼    ▼    ▼
CPU 块(大块,block_size_factor = 2)
┌─────────────┬─────────────┬─────────────┐
│   C0        │   C1        │   C2        │  ← 3 个 CPU 块
│ (G0+G1)     │ (G2+G3)     │ (G4+G5)     │
└─────────────┴─────────────┴─────────────┘

如果请求从 G2 开始(block_indices = 2):
  block_indices % block_size_factor = 2 % 2 = 0  → 无需跳过
如果请求从 G3 开始(block_indices = 3):
  block_indices % block_size_factor = 3 % 2 = 1  → 需跳过 C1 的前半部分

1.8 CanonicalKVCache 数据结构族

# 第269-317行
@dataclass
class CanonicalKVCacheTensor:
    """规范化 KV 缓存张量,第一维为 num_blocks"""
    tensor: torch.Tensor        # shape: (num_blocks, page_size_bytes), dtype=int8
    page_size_bytes: int        # 每个块的(可能含填充的)字节大小

@dataclass
class CanonicalKVCacheRef:
    """每层/层组对特定 CanonicalKVCacheTensor 的引用"""
    tensor_idx: int             # 在 tensors 列表中的索引
    page_size_bytes: int        # 该层使用的实际(未填充的)页大小

@dataclass
class CanonicalKVCaches:
    """
    规范化的块级 KV 缓存表示。
    
    设计原因:不同 attention 后端的原始张量布局不同:
    - FlashAttention: (2, num_blocks, num_heads, head_size) — K和V拼接在第一维
    - 其他后端: (num_blocks, ...)
    
    CanonicalKVCaches 将所有布局统一为 (num_blocks, page_size_bytes) 的 int8 视图,
    使 offload 逻辑与具体 attention 实现解耦。
    """
    tensors: list[CanonicalKVCacheTensor]             # 唯一张量列表
    group_data_refs: list[list[CanonicalKVCacheRef]]  # 每组每层的引用映射

CanonicalKVCaches 映射示例:

原始 KV 缓存(FlashAttention 布局):
  Tensor A: shape (2, 1000, 32, 128) → 拆分为 2 个 CanonicalKVCacheTensor
    Tensor A_K: shape (1000, 4096) [int8 view]  → tensors[0]
    Tensor A_V: shape (1000, 4096) [int8 view]  → tensors[1]
  Tensor B: shape (2, 1000, 8, 128) → 拆分
    Tensor B_K: shape (1000, 1024) [int8 view]  → tensors[2]
    Tensor B_V: shape (1000, 1024) [int8 view]  → tensors[3]

group_data_refs:
  Group 0 (Attention layers 0-31):
    [CanonicalKVCacheRef(tensor_idx=0, page_size_bytes=4096),   → A_K
     CanonicalKVCacheRef(tensor_idx=1, page_size_bytes=4096)]   → A_V
  Group 1 (Attention layers 32-39):
    [CanonicalKVCacheRef(tensor_idx=2, page_size_bytes=1024),   → B_K
     CanonicalKVCacheRef(tensor_idx=3, page_size_bytes=1024)]   → B_V

1.9 OffloadingSpec — 卸载规格抽象

# 第319-398行
class OffloadingSpec(ABC):
    """Spec for an offloading connector"""
    def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig):
        # 实验性 API 警告
        logger.warning(
            "Initializing OffloadingSpec. This API is experimental and "
            "subject to change in the future as we iterate the design."
        )
        self.vllm_config = vllm_config
        self.kv_cache_config = kv_cache_config

        kv_transfer_config = vllm_config.kv_transfer_config
        assert kv_transfer_config is not None  # 必须配置 KV 传输
        self.extra_config = kv_transfer_config.kv_connector_extra_config  # 额外配置字典

        parallel_config = vllm_config.parallel_config
        # 上下文并行因子:decode_context_parallel × prefill_context_parallel
        # 影响块大小的计算
        context_parallel_factor = (
            parallel_config.decode_context_parallel_size
            * parallel_config.prefill_context_parallel_size
        )

        # vLLM 用于前缀缓存哈希的块大小(以 token 为单位)
        # = cache_config.block_size × context_parallel_factor
        self.hash_block_size = (
            vllm_config.cache_config.block_size * context_parallel_factor
        )
        
        # 每个 KV 缓存组的 GPU 块大小(以 token 为单位)
        self.gpu_block_size: tuple[int, ...] = tuple(
            kv_cache_group.kv_cache_spec.block_size * context_parallel_factor
            for kv_cache_group in kv_cache_config.kv_cache_groups
        )

        # 校验:所有 GPU 块大小必须能被哈希块大小整除
        # 混合模型(如 Mamba+Attention)不同组可能有不同 block_size
        # 但都必须与哈希块大小对齐,否则前缀缓存无法正确匹配
        for block_size in self.gpu_block_size:
            assert block_size % self.hash_block_size == 0, (
                f"gpu_block_size={block_size} not divisible by "
                f"hash_block_size={self.hash_block_size}. "
                f"Hybrid models (e.g. Mamba+Attention) need "
                f"--enable-prefix-caching to align block sizes."
            )

        # 卸载块大小与 GPU 块大小的比率,默认 1(等大)
        self.block_size_factor: int = 1
        offloaded_block_size = self.extra_config.get("block_size")
        if offloaded_block_size is not None:
            # 用户自定义了卸载块大小
            offloaded_block_size_int = int(offloaded_block_size)
            gpu_block_sizes = set(self.gpu_block_size)
            # 如果指定了自定义块大小,要求所有 KV 组的 GPU 块大小相同
            assert len(gpu_block_sizes) == 1, (...)
            gpu_block_size = gpu_block_sizes.pop()
            # 卸载块大小必须是 GPU 块大小的整数倍
            assert offloaded_block_size_int % gpu_block_size == 0
            self.block_size_factor = offloaded_block_size_int // gpu_block_size

第二章 factory.py — 工厂模式实现

# 第17-58行:完整的 OffloadingSpecFactory
class OffloadingSpecFactory:
    # 类变量注册表:name → lazy loader 函数
    _registry: dict[str, Callable[[], type[OffloadingSpec]]] = {}

    @classmethod
    def register_spec(cls, name: str, module_path: str, class_name: str) -> None:
        """注册一个延迟加载的 Spec"""
        if name in cls._registry:
            raise ValueError(f"Connector '{name}' is already registered.")
        # 使用闭包实现延迟加载:注册时不导入模块,只在创建时才导入
        def loader() -> type[OffloadingSpec]:
            module = importlib.import_module(module_path)
            return getattr(module, class_name)
        cls._registry[name] = loader

    @classmethod
    def create_spec(
        cls,
        config: "VllmConfig",
        kv_cache_config: "KVCacheConfig",
    ) -> OffloadingSpec:
        """根据配置创建 OffloadingSpec 实例"""
        kv_transfer_config = config.kv_transfer_config
        assert kv_transfer_config is not None
        
        extra_config = kv_transfer_config.kv_connector_extra_config
        # 从配置中读取 spec 名称,默认为 "CPUOffloadingSpec"
        spec_name = extra_config.get("spec_name", "CPUOffloadingSpec")
        
        if spec_name in cls._registry:
            # 优先从注册表查找(延迟加载)
            spec_cls = cls._registry[spec_name]()
        else:
            # 回退到动态模块路径加载
            spec_module_path = extra_config.get("spec_module_path")
            if spec_module_path is None:
                raise ValueError(f"Unsupported spec type: {spec_name}")
            spec_module = importlib.import_module(spec_module_path)
            spec_cls = getattr(spec_module, spec_name)
        
        assert issubclass(spec_cls, OffloadingSpec)
        logger.info("Creating offloading spec with name: %s", spec_name)
        return spec_cls(config, kv_cache_config)

# 内置注册:CPUOffloadingSpec
OffloadingSpecFactory.register_spec(
    "CPUOffloadingSpec", "vllm.v1.kv_offload.cpu.spec", "CPUOffloadingSpec"
)

工厂决策流程:

                    spec_name from config
                           │
              ┌────────────┼────────────┐
              ▼            │            ▼
        In registry?       │      Not in registry
         │                 │           │
    loader() → spec_cls    │   spec_module_path?
         │                 │      │          │
         │                 │   exists      None
         │                 │      │          │
         ▼                 ▼      ▼          ▼
    spec_cls(config, kv_cache_config)   ValueError

第三章 reuse_manager.py — 复用频率过滤装饰器

3.1 FilterReusedOffloadingManager 完整逐行解析

# 第23-121行
class FilterReusedOffloadingManager(OffloadingManager):
    """
    OffloadingManager 装饰器:跳过复用频率低于阈值的块的存储。
    
    核心动机:只用过一次的块(如一次性 prefill 的 KV Cache)
    不值得卸载到 CPU——因为几乎不会再被重用,卸载反而浪费 CPU 内存和 DMA 带宽。
    只有被 lookup() 命中足够次数的块才有"投资"卸载的价值。
    
    两个拦截点:
    1. lookup() — 记录访问计数
    2. prepare_store() — 过滤低频块
    """

    def __init__(
        self,
        backing: OffloadingManager,       # 被装饰的底层 Manager
        store_threshold: int = 2,          # 存储阈值(默认 2 = 至少被 lookup 2 次)
        max_tracker_size: int = 64_000,    # 计数器最大容量(LRU 淘汰)
    ):
        # 参数校验
        if store_threshold < 2:
            # 阈值为 1 等于不过滤(第一次 lookup 就允许存储),无意义
            raise ValueError(...)
        if max_tracker_size < 1:
            raise ValueError(...)
        
        self._backing = backing
        self.store_threshold = store_threshold
        self.max_tracker_size = max_tracker_size
        # 使用 OrderedDict 实现 O(1) 的 LRU 淘汰
        # key: OffloadKey, value: 访问计数
        self.counts: OrderedDict[OffloadKey, int] = OrderedDict()

    # ------------------------------------------------------------------
    # 拦截的方法
    # ------------------------------------------------------------------

    def lookup(self, key: OffloadKey, req_context: ReqContext) -> bool | None:
        """记录 key 的访问计数,然后委托给底层 Manager"""
        if key in self.counts:
            # 已存在:移到末尾(标记为最近使用)并增加计数
            self.counts.move_to_end(key)
            self.counts[key] += 1
        else:
            # 新 key:检查容量,必要时淘汰最老的条目
            if len(self.counts) >= self.max_tracker_size:
                self.counts.popitem(last=False)  # FIFO:淘汰最早的
            self.counts[key] = 1
        
        # 委托给底层 Manager 的 lookup
        return self._backing.lookup(key, req_context)

    def prepare_store(
        self, keys: Collection[OffloadKey], req_context: ReqContext
    ) -> PrepareStoreOutput | None:
        """过滤低频块后再委托给底层 Manager"""
        # 只保留访问次数 >= store_threshold 的块
        eligible = [
            key for key in keys if self.counts.get(key, 0) >= self.store_threshold
        ]
        # 即使 eligible 为空也是安全的——CPUOffloadingManager 能正确处理空列表
        return self._backing.prepare_store(eligible, req_context)

    # ------------------------------------------------------------------
    # 直接委托的方法(无拦截逻辑)
    # ------------------------------------------------------------------

    def prepare_load(self, keys, req_context) -> LoadStoreSpec:
        return self._backing.prepare_load(keys, req_context)

    def touch(self, keys) -> None:
        return self._backing.touch(keys)

    def complete_load(self, keys) -> None:
        return self._backing.complete_load(keys)

    def complete_store(self, keys, success=True) -> None:
        return self._backing.complete_store(keys, success)

    def take_events(self) -> Iterable[OffloadingEvent]:
        return self._backing.take_events()

频率过滤决策流程:

lookup(key) 被调用
       │
       ▼
  counts 中已存在? ──Yes──→ move_to_end(key); counts[key] += 1
       │
      No
       │
       ▼
  counts 容量已满? ──Yes──→ popitem(last=False) 淘汰最老
       │
      No
       │
       ▼
  counts[key] = 1
       │
       ▼
  委托 _backing.lookup()

prepare_store(keys) 被调用
       │
       ▼
  eligible = [k for k in keys if counts.get(k,0) >= threshold]
       │
       ├── eligible 为空 → _backing.prepare_store([]) → 返回空的 PrepareStoreOutput
       │
       └── eligible 非空 → _backing.prepare_store(eligible) → 返回正常结果

第四章 cpu/manager.py — CPU 卸载管理器

4.1 CPUOffloadingManager 完整逐行解析

# 第25-205行
class CPUOffloadingManager(OffloadingManager):
    """
    具有可插拔 CachePolicy 的 OffloadingManager 实现。
    
    Manager 拥有所有共享逻辑:引用计数、事件发射、块池管理、
    prepare_store/complete_store 的骨架流程。
    策略相关的块组织和驱逐决策委托给 CachePolicy 实现。
    """

    def __init__(
        self,
        num_blocks: int,                      # CPU 块池大小
        cache_policy: Literal["lru", "arc"] = "lru",  # 策略选择
        enable_events: bool = False,           # 是否启用事件追踪
    ):
        self.medium: str = CPULoadStoreSpec.medium()  # "CPU"
        self._num_blocks: int = num_blocks              # 总块数(上限)
        self._num_allocated_blocks: int = 0             # 已分配块数(单调递增直到达到上限)
        self._free_list: list[int] = []                 # 已回收的块 ID 列表(可复用)
        self.events: list[OffloadingEvent] | None = [] if enable_events else None
        
        # 根据策略名选择实现类
        policy_cls = _CACHE_POLICIES.get(cache_policy)
        if policy_cls is None:
            raise ValueError(f"Unknown cache policy: {cache_policy!r}")
        self._policy: CachePolicy = policy_cls(cache_capacity=num_blocks)

块池管理机制:

_num_blocks = 10 (示例)
_num_allocated_blocks = 7
_free_list = [5, 3]

块号: 0  1  2  3  4  5  6  7  8  9
      │  │  │  │  │  │  │  │  │  │
      │  │  │  ↑  │  ↑  │  ╰──╯  ╰── 未分配区间
      │  │  │  回收  回收     │
      │  │  │                 │
      ╰──╯──╯──╰─────────────╯
       已分配且在使用中     已分配但已回收(在_free_list中)
                           等待复用

_get_num_free_blocks() = len(_free_list) + _num_blocks - _num_allocated_blocks
                        = 2 + 10 - 7 = 5
    # --- 块池操作 ---

    def _get_num_free_blocks(self) -> int:
        # 空闲块数 = 回收列表长度 + 未分配区间
        return len(self._free_list) + self._num_blocks - self._num_allocated_blocks

    def _allocate_blocks(self, keys: list[OffloadKey]) -> list[BlockStatus]:
        num_fresh = min(len(keys), self._num_blocks - self._num_allocated_blocks)
        # num_fresh: 可从未分配区间获取的新块数
        num_reused = len(keys) - num_fresh
        # num_reused: 需要从 _free_list 复用的块数
        assert len(self._free_list) >= num_reused  # 确保回收列表有足够的块

        blocks: list[BlockStatus] = []
        # 分配新块:直接用 _num_allocated_blocks 作为块 ID(递增)
        for _ in range(num_fresh):
            blocks.append(BlockStatus(self._num_allocated_blocks))
            self._num_allocated_blocks += 1

        # 复用回收块:从 _free_list 末尾 pop(LIFO,局部性更好)
        for _ in range(num_reused):
            blocks.append(BlockStatus(self._free_list.pop()))
        return blocks

    def _free_block(self, block: BlockStatus) -> None:
        # 将块 ID 放入回收列表
        self._free_list.append(block.block_id)

    def _get_load_store_spec(
        self, keys, blocks
    ) -> CPULoadStoreSpec:
        # 将 BlockStatus 列表转换为 CPULoadStoreSpec(提取 block_id 数组)
        return CPULoadStoreSpec([block.block_id for block in blocks])

4.2 lookup 实现

    def lookup(self, key: OffloadKey, req_context: ReqContext) -> bool | None:
        block = self._policy.get(key)       # 在策略中查找块
        if block is None:
            return False                     # 块不在缓存中
        if not block.is_ready:
            return None                      # 块正在写入(ref_cnt == -1),需重试
        return True                          # 块已就绪,可读取

4.3 prepare_load 实现

    def prepare_load(self, keys, req_context) -> LoadStoreSpec:
        blocks = []
        for key in keys:
            block = self._policy.get(key)
            # 断言:块必须存在(调用者应先 lookup 确认)
            assert block is not None, f"Block {key!r} not found in cache"
            # 断言:块必须就绪(不能加载正在写入的块)
            assert block.is_ready, f"Block {key!r} is not ready for reading"
            block.ref_cnt += 1               # 增加引用计数,防止加载期间被驱逐
            blocks.append(block)
        return self._get_load_store_spec(keys, blocks)

4.4 prepare_store 实现(最复杂的方法)

    def prepare_store(self, keys, req_context) -> PrepareStoreOutput | None:
        # 第一步:过滤已存储的块
        keys_to_store = [k for k in keys if self._policy.get(k) is None]
        
        if not keys_to_store:
            # 所有块都已存储,无需操作
            return PrepareStoreOutput(
                keys_to_store=[],
                store_spec=self._get_load_store_spec([], []),
                evicted_keys=[],
            )

        # 第二步:计算需要驱逐的块数
        num_blocks_to_evict = len(keys_to_store) - self._get_num_free_blocks()
        
        to_evict: list[OffloadKey] = []
        if num_blocks_to_evict > 0:
            # 需要驱逐:当前输入的块(包括已存储的)必须保留
            protected = set(keys)
            evicted = self._policy.evict(num_blocks_to_evict, protected)
            if evicted is None:
                # 驱逐失败(没有足够的可驱逐块)→ 无法存储
                return None
            for key, block in evicted:
                self._free_block(block)           # 回收块到空闲列表
                to_evict.append(key)

        # 记录驱逐事件
        if to_evict and self.events is not None:
            self.events.append(
                OffloadingEvent(keys=to_evict, medium=self.medium, removed=True)
            )

        # 第三步:分配 CPU 块
        blocks = self._allocate_blocks(keys_to_store)
        assert len(blocks) == len(keys_to_store)

        # 第四步:注册到策略(新块 ref_cnt = -1,表示"正在写入")
        for key, block in zip(keys_to_store, blocks):
            self._policy.insert(key, block)

        # 构建存储规格
        store_spec = self._get_load_store_spec(keys_to_store, blocks)
        return PrepareStoreOutput(
            keys_to_store=keys_to_store,
            store_spec=store_spec,
            evicted_keys=to_evict,
        )

4.5 complete_store 实现

    def complete_store(self, keys, success=True) -> None:
        stored_keys: list[OffloadKey] = []
        
        if success:
            # 存储成功:将块标记为就绪
            for key in keys:
                block = self._policy.get(key)
                if block is not None and not block.is_ready:
                    block.ref_cnt = 0          # 从 -1 变为 0:就绪且无引用
                    stored_keys.append(key)
        else:
            # 存储失败:删除块,回收空间
            for key in keys:
                block = self._policy.get(key)
                if block is not None and not block.is_ready:
                    self._policy.remove(key)
                    self._free_block(block)
        
        # 记录存储成功事件
        if stored_keys and self.events is not None:
            self.events.append(
                OffloadingEvent(keys=stored_keys, medium=self.medium, removed=False)
            )

4.6 BlockStatus 状态机

BlockStatus.ref_cnt 的状态流转:

  新分配(insert)          complete_store(success=True)     prepare_load
  ┌──────────┐             ┌──────────────┐               ┌──────────────┐
  │ ref_cnt  │             │ ref_cnt      │               │ ref_cnt      │
  │ = -1     │ ──────────→ │ = 0          │ ─────────────→│ += 1         │
  │ (写入中)  │             │ (就绪/空闲)   │               │ (被加载引用)  │
  └──────────┘             └──────────────┘               └──────────────┘
       │                        │                              │
       │ complete_store         │ lookup()                     │ complete_load
       │ (success=False)        │ → True                       │ ref_cnt -= 1
       ▼                        │                              ▼
  ┌──────────┐                  │                        ┌──────────────┐
  │ 从策略中  │                  │                        │ ref_cnt      │
  │ 删除      │                  │                        │ -= 1 → 0     │
  │ 回收块    │                  │                        │ (回到就绪)    │
  └──────────┘                  │                        └──────────────┘
                                │
                    lookup() → None
                    (写入中,需重试)

第五章 cpu/policies/ — 缓存替换策略

5.1 BlockStatus(policies/base.py)

class BlockStatus(ctypes.Structure):
    """
    使用 ctypes.Structure 而非 Python dataclass 的原因:
    1. 内存紧凑:c_int32(4B) + c_int64(8B) = 12 字节,Python dataclass 至少 56 字节
    2. 避免 GC 追踪:ctypes 结构体不受 Python GC 管理
    3. 缓存友好:更小的内存占用意味着 L1/L2 缓存命中率更高
    """
    _fields_ = [("ref_cnt", ctypes.c_int32), ("block_id", ctypes.c_int64)]
    
    def __init__(self, block_id: int):
        super().__init__()
        self.ref_cnt = -1         # -1 表示"正在写入,不可读"
        self.block_id = block_id  # CPU 块的物理索引
    
    @property
    def is_ready(self) -> bool:
        return self.ref_cnt >= 0  # >= 0 表示数据就绪

5.2 LRUCachePolicy(policies/lru.py)

class LRUCachePolicy(CachePolicy):
    """基于 OrderedDict 的 LRU 策略"""
    
    def __init__(self, cache_capacity: int):
        # cache_capacity 未使用——LRU 不需要容量限制(由 Manager 的块池管理)
        self.blocks: OrderedDict[OffloadKey, BlockStatus] = OrderedDict()
    
    def get(self, key: OffloadKey) -> BlockStatus | None:
        return self.blocks.get(key)  # O(1) 查找
    
    def insert(self, key: OffloadKey, block: BlockStatus) -> None:
        self.blocks[key] = block  # 插入到末尾(MRU 位置)
    
    def remove(self, key: OffloadKey) -> None:
        del self.blocks[key]
    
    def touch(self, keys: Iterable[OffloadKey]) -> None:
        # reversed:从最后(最晚的块)开始移动,保持相对顺序
        # 如果从前面开始,后面的 move_to_end 会打乱已移动的顺序
        for key in reversed(list(keys)):
            if key in self.blocks:
                self.blocks.move_to_end(key)  # 移到末尾(标记为最近使用)
    
    def evict(self, n: int, protected: set[OffloadKey]) -> list | None:
        if n == 0:
            return []
        candidates = []
        # OrderedDict 按插入顺序迭代,最前面的是最久未使用的
        for key, block in self.blocks.items():
            if block.ref_cnt == 0 and key not in protected:
                # ref_cnt == 0:块就绪且无加载引用,可安全驱逐
                # not in protected:不在当前 store 操作的输入中
                candidates.append((key, block))
                if len(candidates) == n:
                    break
        if len(candidates) < n:
            return None  # 无法驱逐足够的块 → 操作失败
        
        # 批量删除
        for key, _ in candidates:
            del self.blocks[key]
        return candidates

5.3 ARCCachePolicy(policies/arc.py)— 最复杂的策略

class ARCCachePolicy(CachePolicy):
    """
    ARC (Adaptive Replacement Cache) — 自适应替换缓存算法。
    
    四个列表:
    - T1: 最近访问的缓存(块只被访问过一次)
    - T2: 频繁访问的缓存(块被访问多次)
    - B1: T1 的幽灵列表(记录从 T1 被驱逐的块)
    - B2: T2 的幽灵列表(记录从 T2 被驱逐的块)
    - target_t1_size: T1 的自适应目标大小
    
    自适应机制:
    - B1 命中 → 说明"近期访问模式更重要"→ 增大 T1 配额
    - B2 命中 → 说明"频率访问模式更重要"→ 减小 T1 配额
    """

    def __init__(self, cache_capacity: int):
        self.cache_capacity: int = cache_capacity
        self.target_t1_size: float = 0.0  # 初始 T1 配额为 0(全部分给 T2)
        self.t1: OrderedDict[OffloadKey, BlockStatus] = OrderedDict()
        self.t2: OrderedDict[OffloadKey, BlockStatus] = OrderedDict()
        self.b1: OrderedDict[OffloadKey, None] = OrderedDict()  # 幽灵列表只记键
        self.b2: OrderedDict[OffloadKey, None] = OrderedDict()

ARC 数据结构布局:

                 cache_capacity
    ┌────────────────────────────────────────┐
    │                                        │
    │   target_t1_size                       │
    │   ◄─────────────►                      │
    │                                        │
    │   ┌────────┐  ┌──────────────────────┐ │
    │   │   T1   │  │         T2           │ │
    │   │ (最近)  │  │       (频繁)         │ │
    │   └────────┘  └──────────────────────┘ │
    └────────────────────────────────────────┘

    B1 (T1 的幽灵列表)       B2 (T2 的幽灵列表)
    ┌──────────────┐         ┌──────────────┐
    │ 从T1驱逐的key│         │ 从T2驱逐的key│
    │ (无BlockStatus)│       │ (无BlockStatus)│
    └──────────────┘         └──────────────┘

    驱逐决策:
    if |T1| >= target_t1_size → 从 T1 驱逐(加入 B1)
    else                       → 从 T2 驱逐(加入 B2)
    def get(self, key: OffloadKey) -> BlockStatus | None:
        return self.t1.get(key) or self.t2.get(key)  # 先查 T1,再查 T2

    def insert(self, key: OffloadKey, block: BlockStatus) -> None:
        self.t1[key] = block          # 新块总是进入 T1(最近列表)
        self.b1.pop(key, None)        # 如果在 B1 幽灵列表中,移除
        self.b2.pop(key, None)        # 如果在 B2 幽灵列表中,移除

    def remove(self, key: OffloadKey) -> None:
        if self.t1.pop(key, None) is None:  # 尝试从 T1 删除
            self.t2.pop(key, None)           # 不在 T1 则从 T2 删除

    def touch(self, keys: Iterable[OffloadKey]) -> None:
        """ARC 的核心自适应学习逻辑"""
        for key in reversed(list(keys)):
            if key in self.t1:
                block = self.t1.pop(key)
                if not block.is_ready:
                    # 块正在写入(刚 insert 的),不算二次访问
                    # 保持在 T1,移到 MRU 位置
                    self.t1[key] = block
                else:
                    # 块已就绪 → 从 T1 提升到 T2(从"最近"升级为"频繁")
                    self.t2[key] = block

            elif key in self.t2:
                # 已在 T2 → 移到 MRU 位置
                self.t2.move_to_end(key)

            elif key in self.b1:
                # 在 B1 幽灵列表中 → 近期模式更重要,增大 T1 配额
                delta = max(1, len(self.b2) / len(self.b1))
                self.target_t1_size = min(
                    self.target_t1_size + delta, self.cache_capacity
                )
                self.b1.move_to_end(key)  # 更新在幽灵列表中的位置

            elif key in self.b2:
                # 在 B2 幽灵列表中 → 频率模式更重要,减小 T1 配额
                delta = max(1, len(self.b1) / len(self.b2))
                self.target_t1_size = max(self.target_t1_size - delta, 0)
                self.b2.move_to_end(key)

touch() 的自适应学习流程:

touch(key)
    │
    ├── key in T1 ──→ block.is_ready?
    │                    │
    │                   Yes ──→ T1 → T2 (提升:从"最近"到"频繁")
    │                    │
    │                   No  ──→ 留在 T1,移到 MRU 位置
    │
    ├── key in T2 ──→ move_to_end (保持"频繁"标记)
    │
    ├── key in B1 ──→ target_t1_size += delta (增大"最近"配额)
    │                 delta = max(1, |B2|/|B1|)
    │                 含义:B2 越大,说明有越多"频繁"块被驱逐,
    │                 → 近期模式应更受重视
    │
    └── key in B2 ──→ target_t1_size -= delta (减小"最近"配额)
                      delta = max(1, |B1|/|B2|)
                      含义:B1 越大,说明有越多"近期"块被驱逐,
                      → 频率模式应更受重视
    def evict(self, n: int, protected: set[OffloadKey]) -> list | None:
        """ARC 的驱逐逻辑"""
        if n == 0:
            return []
        
        # 原子性:先模拟选取所有候选,确认数量足够后再实际修改
        candidates: list[tuple[OffloadKey, BlockStatus, bool]] = []  # (key, block, from_t1)
        already_selected: set[OffloadKey] = set()
        virtual_t1_size = len(self.t1)  # 虚拟的 T1 大小(模拟驱逐后的变化)

        for _ in range(n):
            candidate = None
            # 根据 target_t1_size 决定驱逐来源
            if virtual_t1_size >= int(self.target_t1_size):
                # T1 超过配额 → 从 T1 驱逐
                for key, block in self.t1.items():
                    if (block.ref_cnt == 0 
                        and key not in protected 
                        and key not in already_selected):
                        candidate = (key, block, True)
                        virtual_t1_size -= 1
                        break
            
            if candidate is None:
                # T1 不足配额 或 T1 中无可驱逐块 → 从 T2 驱逐
                for key, block in self.t2.items():
                    if (block.ref_cnt == 0 
                        and key not in protected 
                        and key not in already_selected):
                        candidate = (key, block, False)
                        break
                if candidate is None:
                    return None  # 无法驱逐足够的块

            candidates.append(candidate)
            already_selected.add(candidate[0])

        # 批量应用驱逐
        result = []
        for key, block, from_t1 in candidates:
            if from_t1:
                del self.t1[key]
                self.b1[key] = None    # 从 T1 驱逐 → 加入 B1 幽灵列表
            else:
                del self.t2[key]
                self.b2[key] = None    # 从 T2 驱逐 → 加入 B2 幽灵列表
            result.append((key, block))

        # 修剪幽灵列表到 cache_capacity
        for ghost in (self.b1, self.b2):
            for _ in range(len(ghost) - self.cache_capacity):
                ghost.popitem(last=False)  # FIFO:淘汰最老的幽灵条目

        return result

第六章 cpu/spec.py — CPU 卸载规格

class CPUOffloadingSpec(OffloadingSpec):
    def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig):
        super().__init__(vllm_config, kv_cache_config)
        
        # 从配置中获取 CPU 内存限额(必须指定)
        cpu_bytes_to_use = self.extra_config.get("cpu_bytes_to_use")
        if not cpu_bytes_to_use:
            raise Exception("cpu_bytes_to_use must be specified")
        
        # 计算每个卸载块的 KV 字节数
        if kv_cache_config.num_blocks > 0:
            total_gpu_kv_bytes = sum(t.size for t in kv_cache_config.kv_cache_tensors)
            # 每个 GPU 块的 KV 字节数 × world_size(跨多卡的 KV 总量)
            kv_bytes_per_block = (
                total_gpu_kv_bytes // kv_cache_config.num_blocks
            ) * vllm_config.parallel_config.world_size
        else:
            kv_bytes_per_block = 0
        
        # 考虑 block_size_factor:卸载块可能比 GPU 块大
        kv_bytes_per_offloaded_block = kv_bytes_per_block * self.block_size_factor
        
        # 计算可分配的 CPU 块数
        self.num_blocks = (
            int(cpu_bytes_to_use) // kv_bytes_per_offloaded_block
            if kv_bytes_per_offloaded_block > 0
            else 0
        )
        
        # 延迟创建的组件
        self._manager: OffloadingManager | None = None
        self._handlers: CpuGpuOffloadingHandlers | None = None
        self.eviction_policy: str = self.extra_config.get("eviction_policy", "lru")

    def get_manager(self) -> OffloadingManager:
        """延迟创建 Manager(单例)"""
        if not self._manager:
            kv_events_config = self.vllm_config.kv_events_config
            enable_events = (
                kv_events_config is not None and kv_events_config.enable_kv_cache_events
            )
            
            # 创建核心 Manager
            self._manager = CPUOffloadingManager(
                num_blocks=self.num_blocks,
                cache_policy=self.eviction_policy,
                enable_events=enable_events,
            )
            
            # 可选:包装复用频率过滤装饰器
            store_threshold = int(self.extra_config.get("store_threshold", 0))
            if store_threshold >= 2:
                max_tracker_size = int(self.extra_config.get("max_tracker_size", 64_000))
                self._manager = FilterReusedOffloadingManager(
                    backing=self._manager,
                    store_threshold=store_threshold,
                    max_tracker_size=max_tracker_size,
                )
        return self._manager

    def get_handlers(self, kv_caches) -> Iterator:
        """延迟创建 Handlers(单例),yield 两个方向的处理器"""
        if not self._handlers:
            if not current_platform.is_cuda_alike():
                raise Exception("CPU Offloading is currently only supported on CUDA-alike GPUs")
            
            self._handlers = CpuGpuOffloadingHandlers(
                kv_caches=kv_caches,
                block_size_factor=self.block_size_factor,
                num_cpu_blocks=self.num_blocks,
            )
        
        # yield 两个方向:(src_type, dst_type, handler)
        yield GPULoadStoreSpec, CPULoadStoreSpec, self._handlers.gpu_to_cpu_handler
        yield CPULoadStoreSpec, GPULoadStoreSpec, self._handlers.cpu_to_gpu_handler

第七章 cpu/gpu_worker.py — GPU↔CPU 传输处理器

7.1 compute_sub_block_ptrs — 子块指针计算

def compute_sub_block_ptrs(
    block_ids: np.ndarray,          # 块 ID 数组
    block_size_factor: int,         # 每块包含的子块数
    output: np.ndarray,             # 输出:预分配的 int64 指针数组
    tensor: torch.Tensor,           # 源/目标张量
    skip_count: int = 0,            # 首块需要跳过的子块数
):
    """
    计算每个子块的字节指针。
    
    核心公式:
        ptr(sub_block j of block b) = base_ptr + b * row_stride + j * sub_block_size
    
    skip_count 的用途:
    当请求从块的中间位置开始时(block_indices % block_size_factor != 0),
    需要跳过前面的子块,只传输需要的数据。
    """
    assert skip_count < block_size_factor
    num_sub_blocks = len(output)
    base_ptr = tensor.data_ptr()          # 张量的基地址
    row_stride = tensor.stride(0)         # 行步幅(字节)

    if block_size_factor == 1:
        # 快速路径:1:1 映射,无需子块展开
        output[:] = base_ptr + block_ids[:num_sub_blocks] * row_stride
        return

    # 向量化展开(block_size_factor > 1 的情况)
    assert tensor.shape[1] % block_size_factor == 0
    sub_block_size = tensor.shape[1] // block_size_factor  # 子块大小(字节)
    sub_offsets = np.arange(block_size_factor, dtype=np.int64) * sub_block_size
    
    # 广播计算所有子块指针:
    # (num_blocks, 1) + (1, block_size_factor) → (num_blocks, block_size_factor)
    all_ptrs = (
        base_ptr + block_ids.astype(np.int64)[:, np.newaxis] * row_stride
    ) + sub_offsets[np.newaxis, :]
    
    # 展平并应用 skip_count
    flat = all_ptrs.ravel()
    output[:] = flat[skip_count : skip_count + num_sub_blocks]

子块指针计算示意:

block_size_factor = 2, skip_count = 1

CPU 块布局:
  Block 0: [sub0 | sub1]  row_stride = 2 * sub_block_size
  Block 1: [sub0 | sub1]
  Block 2: [sub0 | sub1]

GPU 块(只传输 sub1 of Block 0, sub0+sub1 of Block 1, sub0 of Block 2):

all_ptrs:
  Block 0: [ptr(0,0), ptr(0,1)]
  Block 1: [ptr(1,0), ptr(1,1)]
  Block 2: [ptr(2,0), ptr(2,1)]

flat: [ptr(0,0), ptr(0,1), ptr(1,0), ptr(1,1), ptr(2,0), ptr(2,1)]

skip_count=1: output[:] = flat[1 : 1+4] = [ptr(0,1), ptr(1,0), ptr(1,1), ptr(2,0)]

7.2 SingleDirectionOffloadingHandler — 单向传输处理器

class SingleDirectionOffloadingHandler(OffloadingHandler):
    """
    处理单方向(GPU→CPU 或 CPU→GPU)的异步传输。
    
    传输保证顺序执行:每个传输使用独立的 CUDA Stream,
    新 Stream 通过 wait_event 等待上一个传输完成后再开始。
    
    流水线:
    [Transfer1] ──wait_event──→ [Transfer2] ──wait_event──→ [Transfer3]
       │ stream1                   │ stream2                   │ stream3
       │                           │                           │
    start_ev  end_ev           start_ev  end_ev           start_ev  end_ev
    """
    
    def __init__(self, gpu_tensors, cpu_tensors, block_size_factor,
                 kv_cache_groups_data_refs, gpu_to_cpu):
        # 断言校验:GPU 张量在 CUDA 上,CPU 张量在 CPU 上
        for gpu_tensor, cpu_tensor in zip(gpu_tensors, cpu_tensors):
            assert gpu_tensor.dtype == torch.int8
            assert gpu_tensor.ndim == 2
            assert gpu_tensor.is_cuda
            assert cpu_tensor.dtype == torch.int8
            assert cpu_tensor.ndim == 2
            assert cpu_tensor.device.type == "cpu"
            # CPU 页大小 = GPU 页大小 × block_size_factor
            _, gpu_page_size = gpu_tensor.shape
            _, cpu_page_size = cpu_tensor.shape
            assert cpu_page_size == gpu_page_size * block_size_factor
        
        # 根据方向设置源/目标
        self.src_tensors = gpu_tensors if gpu_to_cpu else cpu_tensors
        self.dst_tensors = cpu_tensors if gpu_to_cpu else gpu_tensors
        self.gpu_to_cpu = gpu_to_cpu
        
        # 块大小因子(方向相关)
        # GPU→CPU: src_block_size_factor=1 (GPU块), dst_block_size_factor=block_size_factor (CPU块)
        # CPU→GPU: src_block_size_factor=block_size_factor (CPU块), dst_block_size_factor=1 (GPU块)
        self.src_block_size_factor = 1 if self.gpu_to_cpu else block_size_factor
        self.dst_block_size_factor = block_size_factor if self.gpu_to_cpu else 1
        
        self.transfer_type = ("GPU", "CPU") if self.gpu_to_cpu else ("CPU", "GPU")
        
        # 传输管理数据结构
        self._transfer_events: dict[int, torch.Event] = {}  # job_id → end_event
        self._transfers: deque[Transfer] = deque()           # 有序传输队列
        self._stream_pool: list[torch.cuda.Stream] = []     # 可复用的 CUDA Stream
        self._event_pool: list[torch.Event] = []            # 可复用的 CUDA Event

7.3 transfer_async — 异步传输提交

    def transfer_async(self, job_id: int, transfer_spec: TransferSpec) -> bool:
        src_spec, dst_spec = transfer_spec
        
        # 提取 GPU 规格信息(无论方向,GPU spec 总是包含 group 信息)
        gpu_spec = src_spec if self.gpu_to_cpu else dst_spec
        group_sizes = gpu_spec.group_sizes
        block_indices = gpu_spec.block_indices
        
        # 计算总 copy 操作数 = sum(group_size × len(group_data_refs))
        num_copy_ops = 0
        for group_size, group_data_refs in zip(group_sizes, self.kv_cache_groups_data_refs):
            num_copy_ops += group_size * len(group_data_refs)
        
        # 预分配数组
        all_src = np.empty(num_copy_ops, dtype=np.int64)   # 源指针
        all_dst = np.empty(num_copy_ops, dtype=np.int64)   # 目标指针
        all_sizes = np.empty(num_copy_ops, dtype=np.int64) # 每操作的字节数
        
        # 逐组计算指针
        src_offset = 0
        dst_offset = 0
        op_idx = 0
        num_transfer_bytes = 0
        
        for group_size, block_idx, group_data_refs in zip(
            group_sizes, block_indices, self.kv_cache_groups_data_refs
        ):
            if group_size == 0:
                continue
            
            # 计算首块偏移(block_idx % block_size_factor = 子块内偏移)
            src_logical_blocks_to_skip = block_idx % self.src_block_size_factor
            dst_logical_blocks_to_skip = block_idx % self.dst_block_size_factor
            
            # 扩展逻辑块数(含首块偏移部分)
            src_logical_blocks_count = group_size + src_logical_blocks_to_skip
            dst_logical_blocks_count = group_size + dst_logical_blocks_to_skip
            
            # 计算实际需要的物理块数(向上取整)
            dst_blocks_count = cdiv(dst_logical_blocks_count, self.dst_block_size_factor)
            src_blocks_count = cdiv(src_logical_blocks_count, self.src_block_size_factor)
            
            # 提取当前组的块 ID 子集
            group_src = src_blocks[src_offset:src_offset + src_blocks_count]
            group_dst = dst_blocks[dst_offset:dst_offset + dst_blocks_count]
            
            # 对每个 data_ref(每个 layer/layer 组)计算指针
            for data_ref in group_data_refs:
                t_idx = data_ref.tensor_idx
                end_idx = op_idx + group_size
                
                # 计算源和目标的子块指针
                compute_sub_block_ptrs(
                    group_src, self.src_block_size_factor,
                    all_src[op_idx:end_idx],
                    self.src_tensors[t_idx],
                    skip_count=src_logical_blocks_to_skip,
                )
                compute_sub_block_ptrs(
                    group_dst, self.dst_block_size_factor,
                    all_dst[op_idx:end_idx],
                    self.dst_tensors[t_idx],
                    skip_count=dst_logical_blocks_to_skip,
                )
                
                all_sizes[op_idx:end_idx] = data_ref.page_size_bytes
                num_transfer_bytes += group_size * data_ref.page_size_bytes
                op_idx = end_idx
            
            src_offset += src_blocks_count
            dst_offset += dst_blocks_count
        
        # 转换为 torch 张量(零拷贝,共享 numpy 内存)
        batch_src = torch.from_numpy(all_src)
        batch_dst = torch.from_numpy(all_dst)
        batch_sizes = torch.from_numpy(all_sizes)
        
        # 从对象池获取或创建新的 Stream 和 Event
        stream = self._stream_pool.pop() if self._stream_pool else torch.cuda.Stream()
        start_event = self._event_pool.pop() if self._event_pool else torch.Event(enable_timing=True)
        end_event = self._event_pool.pop() if self._event_pool else torch.Event(enable_timing=True)
        
        # 设置依赖关系
        if self.gpu_to_cpu:
            # GPU→CPU:等待当前 CUDA 流(模型计算)完成后再卸载
            # 避免传输未完成的计算结果
            stream.wait_stream(torch.cuda.current_stream())
        if self._transfers:
            # 等待上一个传输完成,保证顺序性
            last_transfer = self._transfers[-1]
            stream.wait_event(last_transfer.end_event)
        
        # 在专用 Stream 上执行批量块拷贝
        with torch.cuda.stream(stream):
            start_event.record(stream)
            if num_copy_ops > 0:
                # 核心操作:批量 DMA 传输
                # swap_blocks_batch 接收三个数组:src_ptrs, dst_ptrs, sizes
                # 每个元素表示一次 memcpy:从 src_ptrs[i] 拷贝 sizes[i] 字节到 dst_ptrs[i]
                ops.swap_blocks_batch(batch_src, batch_dst, batch_sizes)
            end_event.record(stream)
        
        # 记录传输信息
        self._transfer_events[job_id] = end_event
        self._transfers.append(
            Transfer(job_id=job_id, stream=stream,
                     start_event=start_event, end_event=end_event,
                     num_bytes=num_transfer_bytes)
        )
        return True

CUDA Stream 依赖图:

default_stream (模型计算)
       │
       │ wait_stream
       ▼
stream_1 (Transfer 1: GPU→CPU)
       │
       │ wait_event(end_event_1)
       ▼
stream_2 (Transfer 2: GPU→CPU)
       │
       │ wait_event(end_event_2)
       ▼
stream_3 (Transfer 3: GPU→CPU)

7.4 get_finished — 查询已完成传输

    def get_finished(self) -> list[TransferResult]:
        results = []
        # deque 头部是最早提交的传输
        # query() 检查 Event 是否完成(非阻塞)
        while self._transfers and self._transfers[0].end_event.query():
            transfer = self._transfers.popleft()
            # 计算传输耗时(毫秒→秒)
            transfer_time = transfer.start_event.elapsed_time(transfer.end_event) * 1e-3
            result = TransferResult(
                job_id=transfer.job_id,
                success=True,
                transfer_size=transfer.num_bytes,
                transfer_time=transfer_time,
                transfer_type=self.transfer_type,
            )
            results.append(result)
            # 回收 Stream 和 Event 到对象池
            self._stream_pool.append(transfer.stream)
            self._event_pool.append(transfer.end_event)
            self._event_pool.append(transfer.start_event)
            del self._transfer_events[transfer.job_id]
        return results

第八章 cpu/shared_offload_region.py — mmap 共享内存

图 16:mmap 文件创建协调时序图

Worker 1 (rank=1) /dev/shm/ Worker 0 (rank=0) Worker 1 (rank=1) /dev/shm/ Worker 0 (rank=0) 系统启动,多个 Worker 同时初始化 Worker 0 (Creator) 文件扩展到指定大小 Worker 1 (Opener) 自旋等待文件达到预期大小 两个 Worker 都 mmap 同一文件 各自 MADV_POPULATE_WRITE 自己的 Worker 区域 可选: pin_mmap_region() 注册 CUDA pinned memory 创建层视图 os.open(O_CREAT | O_EXCL | O_RDWR) fd (成功,文件不存在) os.ftruncate(fd, total_size_bytes) os.open(O_CREAT | O_EXCL | O_RDWR) FileExistsError os.open(O_RDWR) fd _wait_for_file_size(fd, expected) st_size >= expected_size mmap.mmap(fd, total_size, MAP_SHARED) mmap.mmap(fd, total_size, MAP_SHARED) populate Worker 0 的页面 populate Worker 1 的页面 cudaHostRegister(base_ptr, total_size) cudaHostRegister(base_ptr, total_size) create_next_view(L0_page_size) create_next_view(L1_page_size) create_next_view(L0_page_size) create_next_view(L1_page_size)

8.1 SharedOffloadRegion 完整逐行解析

class SharedOffloadRegion:
    """
    基于 mmap 的共享内存区域。
    
    核心设计:
    1. 使用 /dev/shm 上的文件实现多 Worker 间共享
    2. 第一个 Worker (creator) 用 O_EXCL 创建文件并 ftruncate
    3. 后续 Worker 打开已存在的文件并等待其达到预期大小
    4. 交错布局:每个 block 行包含所有 Worker 的数据
    
    布局示意(2 个 Worker,3 个 block):
    
    Row 0: [Worker0_Block0 | Worker1_Block0]
    Row 1: [Worker0_Block1 | Worker1_Block1]
    Row 2: [Worker0_Block2 | Worker1_Block2]
    
    row_stride = cpu_page_size * num_workers
    每个 Worker 的偏移 = rank * cpu_page_size
    """
    
    def __init__(self, instance_id, total_size_bytes, num_blocks, rank, num_workers, cpu_page_size):
        self.page_size = mmap.PAGESIZE
        self.total_size_bytes = total_size_bytes
        self.mmap_path = f"/dev/shm/vllm_offload_{instance_id}.mmap"
        self._creator = False
        self.num_blocks = num_blocks
        self.rank = rank
        self._row_stride = cpu_page_size * num_workers  # 行步幅
        
        if rank is not None:
            self._worker_offset = rank * cpu_page_size    # Worker 在每行中的偏移
            self._worker_area_end = (rank + 1) * cpu_page_size  # Worker 区域结束
        
        try:
            # O_CREAT | O_EXCL:独占创建,只有第一个 Worker 成功
            self.fd = os.open(self.mmap_path, os.O_CREAT | os.O_EXCL | os.O_RDWR, 0o600)
            os.ftruncate(self.fd, self.total_size_bytes)  # 扩展文件到指定大小
            self._creator = True
        except FileExistsError:
            # 后续 Worker 打开已存在的文件
            self.fd = os.open(self.mmap_path, os.O_RDWR)
            # 忙等待直到文件达到预期大小(creator 尚未完成 ftruncate)
            _wait_for_file_size(self.fd, self.total_size_bytes)
        
        # 创建共享 mmap 映射
        self.mmap_obj = mmap.mmap(
            self.fd, self.total_size_bytes,
            flags=mmap.MAP_SHARED,           # 共享映射(多进程可见)
            prot=mmap.PROT_READ | mmap.PROT_WRITE,  # 读写权限
        )
        
        # MADV_POPULATE_WRITE:预分配物理页,避免首次访问时缺页中断
        # 这对 DMA 传输至关重要——缺页中断会导致传输变慢
        _MADV_POPULATE_WRITE = getattr(mmap, "MADV_POPULATE_WRITE", 23)
        
        if rank is not None:
            # 只预分配当前 Worker 的页面(节省内存)
            for block in range(num_blocks):
                raw_offset = block * self._row_stride + worker_offset
                # 对齐到页边界
                aligned_offset = (raw_offset // page_size) * page_size
                end = raw_offset + cpu_page_size
                aligned_length = end - aligned_offset
                self.mmap_obj.madvise(_MADV_POPULATE_WRITE, aligned_offset, aligned_length)
        else:
            # 无 rank → 预分配整个区域
            self.mmap_obj.madvise(_MADV_POPULATE_WRITE, 0, self.total_size_bytes)
        
        # 将 mmap 转为 torch int8 张量
        self._base = torch.frombuffer(memoryview(self.mmap_obj), dtype=torch.int8)
        self._views: list[torch.Tensor] = []
        self.is_pinned: bool = False  # 是否已注册为 CUDA pinned memory

8.2 create_next_view — 创建层视图

    def create_next_view(self, tensor_page_size: int) -> torch.Tensor:
        """
        为当前 Worker 分配一个层的视图。
        
        每次 create_next_view 调用递增 _worker_offset,
        连续的层在内存中是连续排列的(在 Worker 区域内)。
        
        返回 shape = (num_blocks, tensor_page_size), stride = (row_stride, 1) 的 int8 张量
        使用 as_strided 创建非连续视图:
        - 行步幅 = row_stride(跳过其他 Worker 的数据)
        - 列步幅 = 1(int8 单字节)
        
        这样 swap_blocks_batch 可以用行号直接索引每个 block
        """
        assert self.rank is not None
        new_offset = self._worker_offset + tensor_page_size
        assert new_offset <= self._worker_area_end  # 不超过 Worker 区域
        
        worker_layer_view = torch.as_strided(
            self._base,
            size=(self.num_blocks, tensor_page_size),
            stride=(self._row_stride, 1),     # 行步幅跨越其他 Worker
            storage_offset=self._worker_offset, # 从 Worker 偏移开始
        )
        self._worker_offset = new_offset  # 递增偏移供下一层使用
        self._views.append(worker_layer_view)
        return worker_layer_view

层视图内存布局示例(2 Worker, 2 层, 3 blocks):

/dev/shm/vllm_offload_xxx.mmap

Worker0 区域:            Worker1 区域:
┌────────────────┐ ┌────────────────┐
│ W0_L0_Block0   │ │ W1_L0_Block0   │  ← Row 0 (stride = page * 2)
│ W0_L1_Block0   │ │ W1_L1_Block0   │
├────────────────┤ ├────────────────┤
│ W0_L0_Block1   │ │ W1_L0_Block1   │  ← Row 1
│ W0_L1_Block1   │ │ W1_L1_Block1   │
├────────────────┤ ├────────────────┤
│ W0_L0_Block2   │ │ W1_L0_Block2   │  ← Row 2
│ W0_L1_Block2   │ │ W1_L1_Block2   │
└────────────────┘ └────────────────┘

W0_L0 视图: stride = (2 * cpu_page_size, 1), offset = 0
W0_L1 视图: stride = (2 * cpu_page_size, 1), offset = L0_page_size

8.3 cleanup — 资源释放

    def cleanup(self) -> None:
        # 1. 如果注册了 pinned memory,先取消注册
        if self.is_pinned and self._base is not None:
            base_ptr = self._base.data_ptr()
            result = torch.cuda.cudart().cudaHostUnregister(base_ptr)
            if result.value != 0:
                logger.warning("cudaHostUnregister failed for rank=%d", self.rank)
            self.is_pinned = False
        
        # 2. 释放视图(释放对 _base 的引用,使 StorageImpl 的引用计数归零)
        if self._views is not None:
            self._views.clear()
        
        # 3. 释放 torch 张量(解除对 mmap buffer 的引用)
        self._base = None
        
        # 4. 关闭 mmap 映射
        if self.mmap_obj:
            self.mmap_obj.close()
            self.mmap_obj = None
        
        # 5. 关闭文件描述符
        if self.fd is not None:
            os.close(self.fd)
            self.fd = None
        
        # 6. 只有 creator 才删除文件(避免多 Worker 竞争删除)
        if self._creator and self.mmap_path:
            os.unlink(self.mmap_path)
            self._creator = False

第九章 worker/worker.py — Worker 侧抽象

9.1 OffloadingHandler 抽象

TransferSpec = tuple[LoadStoreSpec, LoadStoreSpec]  # (src_spec, dst_spec)
TransferType = tuple[str, str]                       # (src_medium, dst_medium)

@dataclass
class TransferResult:
    job_id: int
    success: bool
    transfer_size: int | None = None     # 传输字节数
    transfer_time: float | None = None   # 传输耗时(秒)
    transfer_type: TransferType | None = None  # 传输方向

class OffloadingHandler(ABC):
    """Worker 侧的异步传输处理器抽象"""
    
    @abstractmethod
    def transfer_async(self, job_id: int, spec: TransferSpec) -> bool:
        """提交异步传输,返回是否成功"""
    
    @abstractmethod
    def get_finished(self) -> list[TransferResult]:
        """获取自上次调用以来完成的传输"""
    
    @abstractmethod
    def wait(self, job_ids: set[int]) -> None:
        """阻塞等待指定 job 完成"""
    
    def shutdown(self) -> None:
        """默认空实现"""

9.2 OffloadingWorker — 多 Handler 路由器

class OffloadingWorker:
    """
    管理多个 OffloadingHandler,根据传输类型路由到正确的 Handler。
    
    注册机制:
    register_handler(GPULoadStoreSpec, CPULoadStoreSpec, gpu_to_cpu_handler)
    → transfer_type = ("GPU", "CPU") → handler
    
    transfer_async 时:
    spec = (GPULoadStoreSpec(...), CPULoadStoreSpec(...))
    → src.medium() = "GPU", dst.medium() = "CPU"
    → 查找 ("GPU", "CPU") → gpu_to_cpu_handler
    """
    
    def __init__(self):
        self.handlers: set[OffloadingHandler] = set()
        self.transfer_type_to_handler: dict[TransferType, OffloadingHandler] = {}
    
    def register_handler(self, src_cls, dst_cls, handler):
        transfer_type = (src_cls.medium(), dst_cls.medium())
        assert transfer_type not in self.transfer_type_to_handler  # 不允许重复注册
        self.handlers.add(handler)
        self.transfer_type_to_handler[transfer_type] = handler
    
    def transfer_async(self, job_id: int, spec: TransferSpec) -> bool:
        src, dst = spec
        transfer_type = (src.medium(), dst.medium())
        handler = self.transfer_type_to_handler.get(transfer_type)
        assert handler is not None
        
        try:
            success = handler.transfer_async(job_id, spec)
        except Exception as e:
            logger.warning("Exception in %r transfer %d: %r", transfer_type, job_id, e)
            return False
        
        if not success:
            logger.warning("Failed to submit %r transfer %d", transfer_type, job_id)
        return success
    
    def get_finished(self) -> list[TransferResult]:
        """从所有 Handler 收集已完成传输"""
        finished = []
        for handler in self.handlers:
            finished.extend(handler.get_finished())
        return finished
    
    def wait(self, job_ids: set[int]) -> None:
        for handler in self.handlers:
            handler.wait(job_ids)
    
    def shutdown(self) -> None:
        for handler in self.handlers:
            handler.shutdown()

第十章 cpu/common.py — CPULoadStoreSpec

class CPULoadStoreSpec(BlockIDsLoadStoreSpec):
    """CPU 侧加载/存储规格,比 GPU 规格简单——无 group_sizes/block_indices"""
    
    @staticmethod
    def medium() -> str:
        return "CPU"

CPU 规格更简单的原因:CPU 块是"大块"(block_size_factor 个 GPU 块合并),不存在子块映射问题。CPU 块 ID 直接对应物理内存位置,无需额外的分组信息。


文档二完成。接下来请参阅文档三(架构图/流程图/类图集合)。

Logo

免费领 100 小时云算力,进群参与显卡、AI PC 幸运抽奖

更多推荐