【vllm】(vllm kv_offload)vLLM V1 KV Offload—(二)核心业务逻辑逐行解析
vLLM v1 KV Offload 模块 — 超深度架构分析(二):核心业务逻辑逐行解析
·
vLLM v1 KV Offload 模块 — 超深度架构分析(二):核心业务逻辑逐行解析
分析基于源码目录:
github.com/vllm/vllm/v1/kv_offload
图 3:初始化流程图
第一章 base.py — 核心抽象基类与数据结构
1.1 OffloadKey — 卸载块的唯一标识
# 第24-27行:OffloadKey 类型定义
# `OffloadKey` identifies an offloaded block. It combines a block hash with
# its KV cache group index, encoded as raw bytes to avoid tuple GC overhead.
# Use the helper functions below to construct / decompose keys.
OffloadKey = NewType("OffloadKey", bytes)
逐行解析:
OffloadKey是bytes的 NewType 别名,不是独立的类- 设计意图:避免 Python tuple 的 GC 开销。如果用
(block_hash, group_idx)元组,每次创建都会触发 Python 对象分配和 GC 追踪;而bytes是不可变序列,分配更高效 - 包含两部分信息:block_hash(块的哈希值,用于前缀缓存匹配)+ group_idx(KV 缓存组索引,支持混合模型如 Mamba+Attention)
# 第32-44行:OffloadKey 的构造与分解函数
def make_offload_key(block_hash: bytes, group_idx: int) -> OffloadKey:
"""Pack a block hash and group index into an `OffloadKey`."""
# 将 group_idx 编码为 4 字节大端无符号整数,拼接到 block_hash 后面
# 大端序保证跨平台一致性;4 字节足够容纳任何合理的 group_idx
return OffloadKey(block_hash + group_idx.to_bytes(4, "big", signed=False))
def get_offload_block_hash(key: OffloadKey) -> bytes:
"""Extract the block hash from an `OffloadKey`."""
# 最后 4 字节是 group_idx,前面的都是 block_hash
return key[:-4]
def get_offload_group_idx(key: OffloadKey) -> int:
"""Extract the group index from an `OffloadKey`."""
# 解码最后 4 字节为大端无符号整数
return int.from_bytes(key[-4:], "big", signed=False)
内存布局:
OffloadKey (bytes)
┌──────────────────────────────┬──────────────┐
│ block_hash (变长) │ group_idx │
│ N bytes (前缀缓存哈希) │ 4 bytes │
│ │ (大端序) │
└──────────────────────────────┴──────────────┘
↑ key[-4:]
key[:-4] ───────┘
1.2 ReqContext — 请求上下文
# 第47-49行
@dataclass
class ReqContext:
kv_transfer_params: dict[str, Any] | None = None
# 每请求级别的 KV 传输参数(如分布式推理中的节点间传输参数)
# 可选字段,None 表示无特殊传输需求
1.3 LoadStoreSpec — 加载/存储规格抽象
# 第52-65行
class LoadStoreSpec(ABC):
"""
Abstract metadata that encapsulates information allowing a worker
to load, and optionally also to store, blocks of KV data.
"""
@staticmethod
@abstractmethod
def medium() -> str:
"""
Returns a string representation of the medium type
this store/load targets.
"""
pass
# 静态方法,返回存储介质类型标识
# GPU → "GPU", CPU → "CPU"
# 用于 OffloadingWorker 路由传输到正确的 Handler
设计意图: LoadStoreSpec 是连接 Manager(控制面)和 Handler(数据面)的关键数据结构。Manager 决定"哪些块需要搬移"后,将块的 ID 封装为 LoadStoreSpec,传递给 Handler 执行实际的 DMA 传输。
1.4 PrepareStoreOutput — 存储准备结果
# 第68-72行
@dataclass
class PrepareStoreOutput:
keys_to_store: list[OffloadKey] # 需要存储的块键列表(已过滤掉已存在的块)
store_spec: LoadStoreSpec # 存储规格(告诉 Worker 写到哪些 CPU 块)
evicted_keys: list[OffloadKey] # 因空间不足被驱逐的块键列表
1.5 OffloadingEvent — 卸载事件
# 第75-80行
@dataclass
class OffloadingEvent:
keys: list[OffloadKey] # 涉及的块键
medium: str # 介质类型("CPU")
removed: bool # True=块被驱逐(删除),False=块被存储(新增)
1.6 OffloadingManager — 核心管理器抽象
# 第110-217行:完整的 OffloadingManager 抽象类
class OffloadingManager(ABC):
@abstractmethod
def lookup(self, key: OffloadKey, req_context: ReqContext) -> bool | None:
"""
Checks whether a single block is offloaded and ready to be read.
返回值三态逻辑:
- True: 块已卸载且可读(数据就绪)
- False: 块未卸载(不在缓存中)
- None: 块已卸载但正在写入中,调用者应稍后重试
返回 None 会导致 Scheduler 延迟该请求的处理
"""
pass
@abstractmethod
def prepare_load(
self,
keys: Collection[OffloadKey],
req_context: ReqContext,
) -> LoadStoreSpec:
"""
准备加载操作:
1. 假设所有给定的 keys 都已卸载(调用者应先 lookup 确认)
2. 对每个块增加引用计数,防止加载期间被驱逐
3. 返回 LoadStoreSpec 告诉 Worker 从哪些 CPU 块读取
注意:调用者必须在加载完成后调用 complete_load 释放引用计数
"""
pass
def touch(self, keys: Collection[OffloadKey]):
"""标记块为最近使用(默认空实现,策略层覆盖)"""
return
def complete_load(self, keys: Collection[OffloadKey]):
"""加载完成,释放引用计数(默认空实现,子类覆盖)"""
return
@abstractmethod
def prepare_store(
self,
keys: Collection[OffloadKey],
req_context: ReqContext,
) -> PrepareStoreOutput | None:
"""
准备存储操作:
1. 过滤掉已存储的块
2. 检查空间是否足够,不足则触发驱逐
3. 分配 CPU 块,注册到策略
4. 返回 PrepareStoreOutput(含存储规格和驱逐列表)
5. None 表示无法存储(空间不足且无法驱逐)
"""
pass
def complete_store(self, keys: Collection[OffloadKey], success: bool = True):
"""
存储完成:
- success=True: 标记块为 is_ready=True(可供后续 lookup 命中)
- success=False: 从策略中移除块,回收 CPU 块
"""
return
def take_events(self) -> Iterable[OffloadingEvent]:
"""获取事件流(默认空迭代器)"""
return ()
def shutdown(self) -> None:
"""关闭管理器,释放资源(默认空实现)"""
return
1.7 BlockIDsLoadStoreSpec 与 GPULoadStoreSpec
# 第219-267行
class BlockIDsLoadStoreSpec(LoadStoreSpec, ABC):
"""基于块 ID 的 LoadStoreSpec 基类"""
def __init__(self, block_ids: list[int]):
# 使用 numpy int64 数组存储,而不是 Python list
# 原因:后续需要传给 compute_sub_block_ptrs 进行向量化的指针计算
self.block_ids = np.array(block_ids, dtype=np.int64)
class GPULoadStoreSpec(BlockIDsLoadStoreSpec):
"""
GPU 侧加载/存储规格。
核心问题:当 CPU 卸载块大小 ≠ GPU 块大小时(block_size_factor > 1),
一个 CPU 块对应多个 GPU 子块。需要额外信息来正确映射。
三个关键字段:
"""
def __init__(
self,
block_ids: list[int], # GPU 块 ID 列表
group_sizes: Sequence[int], # 每个 KV 组的 GPU 块数量
block_indices: Sequence[int], # 每个组内第一个块的逻辑索引
):
super().__init__(block_ids)
# 约束校验
assert sum(group_sizes) == len(block_ids) # 组大小之和等于总块数
assert len(block_indices) == len(group_sizes) # 组数一致
self.group_sizes: Sequence[int] = group_sizes
self.block_indices: Sequence[int] = block_indices
# block_indices 的用途:
# 当 block_size_factor > 1 时,请求的第一个 GPU 块可能不是 CPU 块的起始位置
# block_indices[i] % block_size_factor = 该组首块在 CPU 块内的偏移(子块数)
# 这决定了 DMA 传输时需要跳过多少数据
GPU/CPU 块映射关系:
GPU 块(小块,block_size = 16 tokens)
┌────┬────┬────┬────┬────┬────┐
│ G0 │ G1 │ G2 │ G3 │ G4 │ G5 │ ← 6 个 GPU 块
└────┴────┴────┴────┴────┴────┘
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
CPU 块(大块,block_size_factor = 2)
┌─────────────┬─────────────┬─────────────┐
│ C0 │ C1 │ C2 │ ← 3 个 CPU 块
│ (G0+G1) │ (G2+G3) │ (G4+G5) │
└─────────────┴─────────────┴─────────────┘
如果请求从 G2 开始(block_indices = 2):
block_indices % block_size_factor = 2 % 2 = 0 → 无需跳过
如果请求从 G3 开始(block_indices = 3):
block_indices % block_size_factor = 3 % 2 = 1 → 需跳过 C1 的前半部分
1.8 CanonicalKVCache 数据结构族
# 第269-317行
@dataclass
class CanonicalKVCacheTensor:
"""规范化 KV 缓存张量,第一维为 num_blocks"""
tensor: torch.Tensor # shape: (num_blocks, page_size_bytes), dtype=int8
page_size_bytes: int # 每个块的(可能含填充的)字节大小
@dataclass
class CanonicalKVCacheRef:
"""每层/层组对特定 CanonicalKVCacheTensor 的引用"""
tensor_idx: int # 在 tensors 列表中的索引
page_size_bytes: int # 该层使用的实际(未填充的)页大小
@dataclass
class CanonicalKVCaches:
"""
规范化的块级 KV 缓存表示。
设计原因:不同 attention 后端的原始张量布局不同:
- FlashAttention: (2, num_blocks, num_heads, head_size) — K和V拼接在第一维
- 其他后端: (num_blocks, ...)
CanonicalKVCaches 将所有布局统一为 (num_blocks, page_size_bytes) 的 int8 视图,
使 offload 逻辑与具体 attention 实现解耦。
"""
tensors: list[CanonicalKVCacheTensor] # 唯一张量列表
group_data_refs: list[list[CanonicalKVCacheRef]] # 每组每层的引用映射
CanonicalKVCaches 映射示例:
原始 KV 缓存(FlashAttention 布局):
Tensor A: shape (2, 1000, 32, 128) → 拆分为 2 个 CanonicalKVCacheTensor
Tensor A_K: shape (1000, 4096) [int8 view] → tensors[0]
Tensor A_V: shape (1000, 4096) [int8 view] → tensors[1]
Tensor B: shape (2, 1000, 8, 128) → 拆分
Tensor B_K: shape (1000, 1024) [int8 view] → tensors[2]
Tensor B_V: shape (1000, 1024) [int8 view] → tensors[3]
group_data_refs:
Group 0 (Attention layers 0-31):
[CanonicalKVCacheRef(tensor_idx=0, page_size_bytes=4096), → A_K
CanonicalKVCacheRef(tensor_idx=1, page_size_bytes=4096)] → A_V
Group 1 (Attention layers 32-39):
[CanonicalKVCacheRef(tensor_idx=2, page_size_bytes=1024), → B_K
CanonicalKVCacheRef(tensor_idx=3, page_size_bytes=1024)] → B_V
1.9 OffloadingSpec — 卸载规格抽象
# 第319-398行
class OffloadingSpec(ABC):
"""Spec for an offloading connector"""
def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig):
# 实验性 API 警告
logger.warning(
"Initializing OffloadingSpec. This API is experimental and "
"subject to change in the future as we iterate the design."
)
self.vllm_config = vllm_config
self.kv_cache_config = kv_cache_config
kv_transfer_config = vllm_config.kv_transfer_config
assert kv_transfer_config is not None # 必须配置 KV 传输
self.extra_config = kv_transfer_config.kv_connector_extra_config # 额外配置字典
parallel_config = vllm_config.parallel_config
# 上下文并行因子:decode_context_parallel × prefill_context_parallel
# 影响块大小的计算
context_parallel_factor = (
parallel_config.decode_context_parallel_size
* parallel_config.prefill_context_parallel_size
)
# vLLM 用于前缀缓存哈希的块大小(以 token 为单位)
# = cache_config.block_size × context_parallel_factor
self.hash_block_size = (
vllm_config.cache_config.block_size * context_parallel_factor
)
# 每个 KV 缓存组的 GPU 块大小(以 token 为单位)
self.gpu_block_size: tuple[int, ...] = tuple(
kv_cache_group.kv_cache_spec.block_size * context_parallel_factor
for kv_cache_group in kv_cache_config.kv_cache_groups
)
# 校验:所有 GPU 块大小必须能被哈希块大小整除
# 混合模型(如 Mamba+Attention)不同组可能有不同 block_size
# 但都必须与哈希块大小对齐,否则前缀缓存无法正确匹配
for block_size in self.gpu_block_size:
assert block_size % self.hash_block_size == 0, (
f"gpu_block_size={block_size} not divisible by "
f"hash_block_size={self.hash_block_size}. "
f"Hybrid models (e.g. Mamba+Attention) need "
f"--enable-prefix-caching to align block sizes."
)
# 卸载块大小与 GPU 块大小的比率,默认 1(等大)
self.block_size_factor: int = 1
offloaded_block_size = self.extra_config.get("block_size")
if offloaded_block_size is not None:
# 用户自定义了卸载块大小
offloaded_block_size_int = int(offloaded_block_size)
gpu_block_sizes = set(self.gpu_block_size)
# 如果指定了自定义块大小,要求所有 KV 组的 GPU 块大小相同
assert len(gpu_block_sizes) == 1, (...)
gpu_block_size = gpu_block_sizes.pop()
# 卸载块大小必须是 GPU 块大小的整数倍
assert offloaded_block_size_int % gpu_block_size == 0
self.block_size_factor = offloaded_block_size_int // gpu_block_size
第二章 factory.py — 工厂模式实现
# 第17-58行:完整的 OffloadingSpecFactory
class OffloadingSpecFactory:
# 类变量注册表:name → lazy loader 函数
_registry: dict[str, Callable[[], type[OffloadingSpec]]] = {}
@classmethod
def register_spec(cls, name: str, module_path: str, class_name: str) -> None:
"""注册一个延迟加载的 Spec"""
if name in cls._registry:
raise ValueError(f"Connector '{name}' is already registered.")
# 使用闭包实现延迟加载:注册时不导入模块,只在创建时才导入
def loader() -> type[OffloadingSpec]:
module = importlib.import_module(module_path)
return getattr(module, class_name)
cls._registry[name] = loader
@classmethod
def create_spec(
cls,
config: "VllmConfig",
kv_cache_config: "KVCacheConfig",
) -> OffloadingSpec:
"""根据配置创建 OffloadingSpec 实例"""
kv_transfer_config = config.kv_transfer_config
assert kv_transfer_config is not None
extra_config = kv_transfer_config.kv_connector_extra_config
# 从配置中读取 spec 名称,默认为 "CPUOffloadingSpec"
spec_name = extra_config.get("spec_name", "CPUOffloadingSpec")
if spec_name in cls._registry:
# 优先从注册表查找(延迟加载)
spec_cls = cls._registry[spec_name]()
else:
# 回退到动态模块路径加载
spec_module_path = extra_config.get("spec_module_path")
if spec_module_path is None:
raise ValueError(f"Unsupported spec type: {spec_name}")
spec_module = importlib.import_module(spec_module_path)
spec_cls = getattr(spec_module, spec_name)
assert issubclass(spec_cls, OffloadingSpec)
logger.info("Creating offloading spec with name: %s", spec_name)
return spec_cls(config, kv_cache_config)
# 内置注册:CPUOffloadingSpec
OffloadingSpecFactory.register_spec(
"CPUOffloadingSpec", "vllm.v1.kv_offload.cpu.spec", "CPUOffloadingSpec"
)
工厂决策流程:
spec_name from config
│
┌────────────┼────────────┐
▼ │ ▼
In registry? │ Not in registry
│ │ │
loader() → spec_cls │ spec_module_path?
│ │ │ │
│ │ exists None
│ │ │ │
▼ ▼ ▼ ▼
spec_cls(config, kv_cache_config) ValueError
第三章 reuse_manager.py — 复用频率过滤装饰器
3.1 FilterReusedOffloadingManager 完整逐行解析
# 第23-121行
class FilterReusedOffloadingManager(OffloadingManager):
"""
OffloadingManager 装饰器:跳过复用频率低于阈值的块的存储。
核心动机:只用过一次的块(如一次性 prefill 的 KV Cache)
不值得卸载到 CPU——因为几乎不会再被重用,卸载反而浪费 CPU 内存和 DMA 带宽。
只有被 lookup() 命中足够次数的块才有"投资"卸载的价值。
两个拦截点:
1. lookup() — 记录访问计数
2. prepare_store() — 过滤低频块
"""
def __init__(
self,
backing: OffloadingManager, # 被装饰的底层 Manager
store_threshold: int = 2, # 存储阈值(默认 2 = 至少被 lookup 2 次)
max_tracker_size: int = 64_000, # 计数器最大容量(LRU 淘汰)
):
# 参数校验
if store_threshold < 2:
# 阈值为 1 等于不过滤(第一次 lookup 就允许存储),无意义
raise ValueError(...)
if max_tracker_size < 1:
raise ValueError(...)
self._backing = backing
self.store_threshold = store_threshold
self.max_tracker_size = max_tracker_size
# 使用 OrderedDict 实现 O(1) 的 LRU 淘汰
# key: OffloadKey, value: 访问计数
self.counts: OrderedDict[OffloadKey, int] = OrderedDict()
# ------------------------------------------------------------------
# 拦截的方法
# ------------------------------------------------------------------
def lookup(self, key: OffloadKey, req_context: ReqContext) -> bool | None:
"""记录 key 的访问计数,然后委托给底层 Manager"""
if key in self.counts:
# 已存在:移到末尾(标记为最近使用)并增加计数
self.counts.move_to_end(key)
self.counts[key] += 1
else:
# 新 key:检查容量,必要时淘汰最老的条目
if len(self.counts) >= self.max_tracker_size:
self.counts.popitem(last=False) # FIFO:淘汰最早的
self.counts[key] = 1
# 委托给底层 Manager 的 lookup
return self._backing.lookup(key, req_context)
def prepare_store(
self, keys: Collection[OffloadKey], req_context: ReqContext
) -> PrepareStoreOutput | None:
"""过滤低频块后再委托给底层 Manager"""
# 只保留访问次数 >= store_threshold 的块
eligible = [
key for key in keys if self.counts.get(key, 0) >= self.store_threshold
]
# 即使 eligible 为空也是安全的——CPUOffloadingManager 能正确处理空列表
return self._backing.prepare_store(eligible, req_context)
# ------------------------------------------------------------------
# 直接委托的方法(无拦截逻辑)
# ------------------------------------------------------------------
def prepare_load(self, keys, req_context) -> LoadStoreSpec:
return self._backing.prepare_load(keys, req_context)
def touch(self, keys) -> None:
return self._backing.touch(keys)
def complete_load(self, keys) -> None:
return self._backing.complete_load(keys)
def complete_store(self, keys, success=True) -> None:
return self._backing.complete_store(keys, success)
def take_events(self) -> Iterable[OffloadingEvent]:
return self._backing.take_events()
频率过滤决策流程:
lookup(key) 被调用
│
▼
counts 中已存在? ──Yes──→ move_to_end(key); counts[key] += 1
│
No
│
▼
counts 容量已满? ──Yes──→ popitem(last=False) 淘汰最老
│
No
│
▼
counts[key] = 1
│
▼
委托 _backing.lookup()
prepare_store(keys) 被调用
│
▼
eligible = [k for k in keys if counts.get(k,0) >= threshold]
│
├── eligible 为空 → _backing.prepare_store([]) → 返回空的 PrepareStoreOutput
│
└── eligible 非空 → _backing.prepare_store(eligible) → 返回正常结果
第四章 cpu/manager.py — CPU 卸载管理器
4.1 CPUOffloadingManager 完整逐行解析
# 第25-205行
class CPUOffloadingManager(OffloadingManager):
"""
具有可插拔 CachePolicy 的 OffloadingManager 实现。
Manager 拥有所有共享逻辑:引用计数、事件发射、块池管理、
prepare_store/complete_store 的骨架流程。
策略相关的块组织和驱逐决策委托给 CachePolicy 实现。
"""
def __init__(
self,
num_blocks: int, # CPU 块池大小
cache_policy: Literal["lru", "arc"] = "lru", # 策略选择
enable_events: bool = False, # 是否启用事件追踪
):
self.medium: str = CPULoadStoreSpec.medium() # "CPU"
self._num_blocks: int = num_blocks # 总块数(上限)
self._num_allocated_blocks: int = 0 # 已分配块数(单调递增直到达到上限)
self._free_list: list[int] = [] # 已回收的块 ID 列表(可复用)
self.events: list[OffloadingEvent] | None = [] if enable_events else None
# 根据策略名选择实现类
policy_cls = _CACHE_POLICIES.get(cache_policy)
if policy_cls is None:
raise ValueError(f"Unknown cache policy: {cache_policy!r}")
self._policy: CachePolicy = policy_cls(cache_capacity=num_blocks)
块池管理机制:
_num_blocks = 10 (示例)
_num_allocated_blocks = 7
_free_list = [5, 3]
块号: 0 1 2 3 4 5 6 7 8 9
│ │ │ │ │ │ │ │ │ │
│ │ │ ↑ │ ↑ │ ╰──╯ ╰── 未分配区间
│ │ │ 回收 回收 │
│ │ │ │
╰──╯──╯──╰─────────────╯
已分配且在使用中 已分配但已回收(在_free_list中)
等待复用
_get_num_free_blocks() = len(_free_list) + _num_blocks - _num_allocated_blocks
= 2 + 10 - 7 = 5
# --- 块池操作 ---
def _get_num_free_blocks(self) -> int:
# 空闲块数 = 回收列表长度 + 未分配区间
return len(self._free_list) + self._num_blocks - self._num_allocated_blocks
def _allocate_blocks(self, keys: list[OffloadKey]) -> list[BlockStatus]:
num_fresh = min(len(keys), self._num_blocks - self._num_allocated_blocks)
# num_fresh: 可从未分配区间获取的新块数
num_reused = len(keys) - num_fresh
# num_reused: 需要从 _free_list 复用的块数
assert len(self._free_list) >= num_reused # 确保回收列表有足够的块
blocks: list[BlockStatus] = []
# 分配新块:直接用 _num_allocated_blocks 作为块 ID(递增)
for _ in range(num_fresh):
blocks.append(BlockStatus(self._num_allocated_blocks))
self._num_allocated_blocks += 1
# 复用回收块:从 _free_list 末尾 pop(LIFO,局部性更好)
for _ in range(num_reused):
blocks.append(BlockStatus(self._free_list.pop()))
return blocks
def _free_block(self, block: BlockStatus) -> None:
# 将块 ID 放入回收列表
self._free_list.append(block.block_id)
def _get_load_store_spec(
self, keys, blocks
) -> CPULoadStoreSpec:
# 将 BlockStatus 列表转换为 CPULoadStoreSpec(提取 block_id 数组)
return CPULoadStoreSpec([block.block_id for block in blocks])
4.2 lookup 实现
def lookup(self, key: OffloadKey, req_context: ReqContext) -> bool | None:
block = self._policy.get(key) # 在策略中查找块
if block is None:
return False # 块不在缓存中
if not block.is_ready:
return None # 块正在写入(ref_cnt == -1),需重试
return True # 块已就绪,可读取
4.3 prepare_load 实现
def prepare_load(self, keys, req_context) -> LoadStoreSpec:
blocks = []
for key in keys:
block = self._policy.get(key)
# 断言:块必须存在(调用者应先 lookup 确认)
assert block is not None, f"Block {key!r} not found in cache"
# 断言:块必须就绪(不能加载正在写入的块)
assert block.is_ready, f"Block {key!r} is not ready for reading"
block.ref_cnt += 1 # 增加引用计数,防止加载期间被驱逐
blocks.append(block)
return self._get_load_store_spec(keys, blocks)
4.4 prepare_store 实现(最复杂的方法)
def prepare_store(self, keys, req_context) -> PrepareStoreOutput | None:
# 第一步:过滤已存储的块
keys_to_store = [k for k in keys if self._policy.get(k) is None]
if not keys_to_store:
# 所有块都已存储,无需操作
return PrepareStoreOutput(
keys_to_store=[],
store_spec=self._get_load_store_spec([], []),
evicted_keys=[],
)
# 第二步:计算需要驱逐的块数
num_blocks_to_evict = len(keys_to_store) - self._get_num_free_blocks()
to_evict: list[OffloadKey] = []
if num_blocks_to_evict > 0:
# 需要驱逐:当前输入的块(包括已存储的)必须保留
protected = set(keys)
evicted = self._policy.evict(num_blocks_to_evict, protected)
if evicted is None:
# 驱逐失败(没有足够的可驱逐块)→ 无法存储
return None
for key, block in evicted:
self._free_block(block) # 回收块到空闲列表
to_evict.append(key)
# 记录驱逐事件
if to_evict and self.events is not None:
self.events.append(
OffloadingEvent(keys=to_evict, medium=self.medium, removed=True)
)
# 第三步:分配 CPU 块
blocks = self._allocate_blocks(keys_to_store)
assert len(blocks) == len(keys_to_store)
# 第四步:注册到策略(新块 ref_cnt = -1,表示"正在写入")
for key, block in zip(keys_to_store, blocks):
self._policy.insert(key, block)
# 构建存储规格
store_spec = self._get_load_store_spec(keys_to_store, blocks)
return PrepareStoreOutput(
keys_to_store=keys_to_store,
store_spec=store_spec,
evicted_keys=to_evict,
)
4.5 complete_store 实现
def complete_store(self, keys, success=True) -> None:
stored_keys: list[OffloadKey] = []
if success:
# 存储成功:将块标记为就绪
for key in keys:
block = self._policy.get(key)
if block is not None and not block.is_ready:
block.ref_cnt = 0 # 从 -1 变为 0:就绪且无引用
stored_keys.append(key)
else:
# 存储失败:删除块,回收空间
for key in keys:
block = self._policy.get(key)
if block is not None and not block.is_ready:
self._policy.remove(key)
self._free_block(block)
# 记录存储成功事件
if stored_keys and self.events is not None:
self.events.append(
OffloadingEvent(keys=stored_keys, medium=self.medium, removed=False)
)
4.6 BlockStatus 状态机
BlockStatus.ref_cnt 的状态流转:
新分配(insert) complete_store(success=True) prepare_load
┌──────────┐ ┌──────────────┐ ┌──────────────┐
│ ref_cnt │ │ ref_cnt │ │ ref_cnt │
│ = -1 │ ──────────→ │ = 0 │ ─────────────→│ += 1 │
│ (写入中) │ │ (就绪/空闲) │ │ (被加载引用) │
└──────────┘ └──────────────┘ └──────────────┘
│ │ │
│ complete_store │ lookup() │ complete_load
│ (success=False) │ → True │ ref_cnt -= 1
▼ │ ▼
┌──────────┐ │ ┌──────────────┐
│ 从策略中 │ │ │ ref_cnt │
│ 删除 │ │ │ -= 1 → 0 │
│ 回收块 │ │ │ (回到就绪) │
└──────────┘ │ └──────────────┘
│
lookup() → None
(写入中,需重试)
第五章 cpu/policies/ — 缓存替换策略
5.1 BlockStatus(policies/base.py)
class BlockStatus(ctypes.Structure):
"""
使用 ctypes.Structure 而非 Python dataclass 的原因:
1. 内存紧凑:c_int32(4B) + c_int64(8B) = 12 字节,Python dataclass 至少 56 字节
2. 避免 GC 追踪:ctypes 结构体不受 Python GC 管理
3. 缓存友好:更小的内存占用意味着 L1/L2 缓存命中率更高
"""
_fields_ = [("ref_cnt", ctypes.c_int32), ("block_id", ctypes.c_int64)]
def __init__(self, block_id: int):
super().__init__()
self.ref_cnt = -1 # -1 表示"正在写入,不可读"
self.block_id = block_id # CPU 块的物理索引
@property
def is_ready(self) -> bool:
return self.ref_cnt >= 0 # >= 0 表示数据就绪
5.2 LRUCachePolicy(policies/lru.py)
class LRUCachePolicy(CachePolicy):
"""基于 OrderedDict 的 LRU 策略"""
def __init__(self, cache_capacity: int):
# cache_capacity 未使用——LRU 不需要容量限制(由 Manager 的块池管理)
self.blocks: OrderedDict[OffloadKey, BlockStatus] = OrderedDict()
def get(self, key: OffloadKey) -> BlockStatus | None:
return self.blocks.get(key) # O(1) 查找
def insert(self, key: OffloadKey, block: BlockStatus) -> None:
self.blocks[key] = block # 插入到末尾(MRU 位置)
def remove(self, key: OffloadKey) -> None:
del self.blocks[key]
def touch(self, keys: Iterable[OffloadKey]) -> None:
# reversed:从最后(最晚的块)开始移动,保持相对顺序
# 如果从前面开始,后面的 move_to_end 会打乱已移动的顺序
for key in reversed(list(keys)):
if key in self.blocks:
self.blocks.move_to_end(key) # 移到末尾(标记为最近使用)
def evict(self, n: int, protected: set[OffloadKey]) -> list | None:
if n == 0:
return []
candidates = []
# OrderedDict 按插入顺序迭代,最前面的是最久未使用的
for key, block in self.blocks.items():
if block.ref_cnt == 0 and key not in protected:
# ref_cnt == 0:块就绪且无加载引用,可安全驱逐
# not in protected:不在当前 store 操作的输入中
candidates.append((key, block))
if len(candidates) == n:
break
if len(candidates) < n:
return None # 无法驱逐足够的块 → 操作失败
# 批量删除
for key, _ in candidates:
del self.blocks[key]
return candidates
5.3 ARCCachePolicy(policies/arc.py)— 最复杂的策略
class ARCCachePolicy(CachePolicy):
"""
ARC (Adaptive Replacement Cache) — 自适应替换缓存算法。
四个列表:
- T1: 最近访问的缓存(块只被访问过一次)
- T2: 频繁访问的缓存(块被访问多次)
- B1: T1 的幽灵列表(记录从 T1 被驱逐的块)
- B2: T2 的幽灵列表(记录从 T2 被驱逐的块)
- target_t1_size: T1 的自适应目标大小
自适应机制:
- B1 命中 → 说明"近期访问模式更重要"→ 增大 T1 配额
- B2 命中 → 说明"频率访问模式更重要"→ 减小 T1 配额
"""
def __init__(self, cache_capacity: int):
self.cache_capacity: int = cache_capacity
self.target_t1_size: float = 0.0 # 初始 T1 配额为 0(全部分给 T2)
self.t1: OrderedDict[OffloadKey, BlockStatus] = OrderedDict()
self.t2: OrderedDict[OffloadKey, BlockStatus] = OrderedDict()
self.b1: OrderedDict[OffloadKey, None] = OrderedDict() # 幽灵列表只记键
self.b2: OrderedDict[OffloadKey, None] = OrderedDict()
ARC 数据结构布局:
cache_capacity
┌────────────────────────────────────────┐
│ │
│ target_t1_size │
│ ◄─────────────► │
│ │
│ ┌────────┐ ┌──────────────────────┐ │
│ │ T1 │ │ T2 │ │
│ │ (最近) │ │ (频繁) │ │
│ └────────┘ └──────────────────────┘ │
└────────────────────────────────────────┘
B1 (T1 的幽灵列表) B2 (T2 的幽灵列表)
┌──────────────┐ ┌──────────────┐
│ 从T1驱逐的key│ │ 从T2驱逐的key│
│ (无BlockStatus)│ │ (无BlockStatus)│
└──────────────┘ └──────────────┘
驱逐决策:
if |T1| >= target_t1_size → 从 T1 驱逐(加入 B1)
else → 从 T2 驱逐(加入 B2)
def get(self, key: OffloadKey) -> BlockStatus | None:
return self.t1.get(key) or self.t2.get(key) # 先查 T1,再查 T2
def insert(self, key: OffloadKey, block: BlockStatus) -> None:
self.t1[key] = block # 新块总是进入 T1(最近列表)
self.b1.pop(key, None) # 如果在 B1 幽灵列表中,移除
self.b2.pop(key, None) # 如果在 B2 幽灵列表中,移除
def remove(self, key: OffloadKey) -> None:
if self.t1.pop(key, None) is None: # 尝试从 T1 删除
self.t2.pop(key, None) # 不在 T1 则从 T2 删除
def touch(self, keys: Iterable[OffloadKey]) -> None:
"""ARC 的核心自适应学习逻辑"""
for key in reversed(list(keys)):
if key in self.t1:
block = self.t1.pop(key)
if not block.is_ready:
# 块正在写入(刚 insert 的),不算二次访问
# 保持在 T1,移到 MRU 位置
self.t1[key] = block
else:
# 块已就绪 → 从 T1 提升到 T2(从"最近"升级为"频繁")
self.t2[key] = block
elif key in self.t2:
# 已在 T2 → 移到 MRU 位置
self.t2.move_to_end(key)
elif key in self.b1:
# 在 B1 幽灵列表中 → 近期模式更重要,增大 T1 配额
delta = max(1, len(self.b2) / len(self.b1))
self.target_t1_size = min(
self.target_t1_size + delta, self.cache_capacity
)
self.b1.move_to_end(key) # 更新在幽灵列表中的位置
elif key in self.b2:
# 在 B2 幽灵列表中 → 频率模式更重要,减小 T1 配额
delta = max(1, len(self.b1) / len(self.b2))
self.target_t1_size = max(self.target_t1_size - delta, 0)
self.b2.move_to_end(key)
touch() 的自适应学习流程:
touch(key)
│
├── key in T1 ──→ block.is_ready?
│ │
│ Yes ──→ T1 → T2 (提升:从"最近"到"频繁")
│ │
│ No ──→ 留在 T1,移到 MRU 位置
│
├── key in T2 ──→ move_to_end (保持"频繁"标记)
│
├── key in B1 ──→ target_t1_size += delta (增大"最近"配额)
│ delta = max(1, |B2|/|B1|)
│ 含义:B2 越大,说明有越多"频繁"块被驱逐,
│ → 近期模式应更受重视
│
└── key in B2 ──→ target_t1_size -= delta (减小"最近"配额)
delta = max(1, |B1|/|B2|)
含义:B1 越大,说明有越多"近期"块被驱逐,
→ 频率模式应更受重视
def evict(self, n: int, protected: set[OffloadKey]) -> list | None:
"""ARC 的驱逐逻辑"""
if n == 0:
return []
# 原子性:先模拟选取所有候选,确认数量足够后再实际修改
candidates: list[tuple[OffloadKey, BlockStatus, bool]] = [] # (key, block, from_t1)
already_selected: set[OffloadKey] = set()
virtual_t1_size = len(self.t1) # 虚拟的 T1 大小(模拟驱逐后的变化)
for _ in range(n):
candidate = None
# 根据 target_t1_size 决定驱逐来源
if virtual_t1_size >= int(self.target_t1_size):
# T1 超过配额 → 从 T1 驱逐
for key, block in self.t1.items():
if (block.ref_cnt == 0
and key not in protected
and key not in already_selected):
candidate = (key, block, True)
virtual_t1_size -= 1
break
if candidate is None:
# T1 不足配额 或 T1 中无可驱逐块 → 从 T2 驱逐
for key, block in self.t2.items():
if (block.ref_cnt == 0
and key not in protected
and key not in already_selected):
candidate = (key, block, False)
break
if candidate is None:
return None # 无法驱逐足够的块
candidates.append(candidate)
already_selected.add(candidate[0])
# 批量应用驱逐
result = []
for key, block, from_t1 in candidates:
if from_t1:
del self.t1[key]
self.b1[key] = None # 从 T1 驱逐 → 加入 B1 幽灵列表
else:
del self.t2[key]
self.b2[key] = None # 从 T2 驱逐 → 加入 B2 幽灵列表
result.append((key, block))
# 修剪幽灵列表到 cache_capacity
for ghost in (self.b1, self.b2):
for _ in range(len(ghost) - self.cache_capacity):
ghost.popitem(last=False) # FIFO:淘汰最老的幽灵条目
return result
第六章 cpu/spec.py — CPU 卸载规格
class CPUOffloadingSpec(OffloadingSpec):
def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig):
super().__init__(vllm_config, kv_cache_config)
# 从配置中获取 CPU 内存限额(必须指定)
cpu_bytes_to_use = self.extra_config.get("cpu_bytes_to_use")
if not cpu_bytes_to_use:
raise Exception("cpu_bytes_to_use must be specified")
# 计算每个卸载块的 KV 字节数
if kv_cache_config.num_blocks > 0:
total_gpu_kv_bytes = sum(t.size for t in kv_cache_config.kv_cache_tensors)
# 每个 GPU 块的 KV 字节数 × world_size(跨多卡的 KV 总量)
kv_bytes_per_block = (
total_gpu_kv_bytes // kv_cache_config.num_blocks
) * vllm_config.parallel_config.world_size
else:
kv_bytes_per_block = 0
# 考虑 block_size_factor:卸载块可能比 GPU 块大
kv_bytes_per_offloaded_block = kv_bytes_per_block * self.block_size_factor
# 计算可分配的 CPU 块数
self.num_blocks = (
int(cpu_bytes_to_use) // kv_bytes_per_offloaded_block
if kv_bytes_per_offloaded_block > 0
else 0
)
# 延迟创建的组件
self._manager: OffloadingManager | None = None
self._handlers: CpuGpuOffloadingHandlers | None = None
self.eviction_policy: str = self.extra_config.get("eviction_policy", "lru")
def get_manager(self) -> OffloadingManager:
"""延迟创建 Manager(单例)"""
if not self._manager:
kv_events_config = self.vllm_config.kv_events_config
enable_events = (
kv_events_config is not None and kv_events_config.enable_kv_cache_events
)
# 创建核心 Manager
self._manager = CPUOffloadingManager(
num_blocks=self.num_blocks,
cache_policy=self.eviction_policy,
enable_events=enable_events,
)
# 可选:包装复用频率过滤装饰器
store_threshold = int(self.extra_config.get("store_threshold", 0))
if store_threshold >= 2:
max_tracker_size = int(self.extra_config.get("max_tracker_size", 64_000))
self._manager = FilterReusedOffloadingManager(
backing=self._manager,
store_threshold=store_threshold,
max_tracker_size=max_tracker_size,
)
return self._manager
def get_handlers(self, kv_caches) -> Iterator:
"""延迟创建 Handlers(单例),yield 两个方向的处理器"""
if not self._handlers:
if not current_platform.is_cuda_alike():
raise Exception("CPU Offloading is currently only supported on CUDA-alike GPUs")
self._handlers = CpuGpuOffloadingHandlers(
kv_caches=kv_caches,
block_size_factor=self.block_size_factor,
num_cpu_blocks=self.num_blocks,
)
# yield 两个方向:(src_type, dst_type, handler)
yield GPULoadStoreSpec, CPULoadStoreSpec, self._handlers.gpu_to_cpu_handler
yield CPULoadStoreSpec, GPULoadStoreSpec, self._handlers.cpu_to_gpu_handler
第七章 cpu/gpu_worker.py — GPU↔CPU 传输处理器
7.1 compute_sub_block_ptrs — 子块指针计算
def compute_sub_block_ptrs(
block_ids: np.ndarray, # 块 ID 数组
block_size_factor: int, # 每块包含的子块数
output: np.ndarray, # 输出:预分配的 int64 指针数组
tensor: torch.Tensor, # 源/目标张量
skip_count: int = 0, # 首块需要跳过的子块数
):
"""
计算每个子块的字节指针。
核心公式:
ptr(sub_block j of block b) = base_ptr + b * row_stride + j * sub_block_size
skip_count 的用途:
当请求从块的中间位置开始时(block_indices % block_size_factor != 0),
需要跳过前面的子块,只传输需要的数据。
"""
assert skip_count < block_size_factor
num_sub_blocks = len(output)
base_ptr = tensor.data_ptr() # 张量的基地址
row_stride = tensor.stride(0) # 行步幅(字节)
if block_size_factor == 1:
# 快速路径:1:1 映射,无需子块展开
output[:] = base_ptr + block_ids[:num_sub_blocks] * row_stride
return
# 向量化展开(block_size_factor > 1 的情况)
assert tensor.shape[1] % block_size_factor == 0
sub_block_size = tensor.shape[1] // block_size_factor # 子块大小(字节)
sub_offsets = np.arange(block_size_factor, dtype=np.int64) * sub_block_size
# 广播计算所有子块指针:
# (num_blocks, 1) + (1, block_size_factor) → (num_blocks, block_size_factor)
all_ptrs = (
base_ptr + block_ids.astype(np.int64)[:, np.newaxis] * row_stride
) + sub_offsets[np.newaxis, :]
# 展平并应用 skip_count
flat = all_ptrs.ravel()
output[:] = flat[skip_count : skip_count + num_sub_blocks]
子块指针计算示意:
block_size_factor = 2, skip_count = 1
CPU 块布局:
Block 0: [sub0 | sub1] row_stride = 2 * sub_block_size
Block 1: [sub0 | sub1]
Block 2: [sub0 | sub1]
GPU 块(只传输 sub1 of Block 0, sub0+sub1 of Block 1, sub0 of Block 2):
all_ptrs:
Block 0: [ptr(0,0), ptr(0,1)]
Block 1: [ptr(1,0), ptr(1,1)]
Block 2: [ptr(2,0), ptr(2,1)]
flat: [ptr(0,0), ptr(0,1), ptr(1,0), ptr(1,1), ptr(2,0), ptr(2,1)]
skip_count=1: output[:] = flat[1 : 1+4] = [ptr(0,1), ptr(1,0), ptr(1,1), ptr(2,0)]
7.2 SingleDirectionOffloadingHandler — 单向传输处理器
class SingleDirectionOffloadingHandler(OffloadingHandler):
"""
处理单方向(GPU→CPU 或 CPU→GPU)的异步传输。
传输保证顺序执行:每个传输使用独立的 CUDA Stream,
新 Stream 通过 wait_event 等待上一个传输完成后再开始。
流水线:
[Transfer1] ──wait_event──→ [Transfer2] ──wait_event──→ [Transfer3]
│ stream1 │ stream2 │ stream3
│ │ │
start_ev end_ev start_ev end_ev start_ev end_ev
"""
def __init__(self, gpu_tensors, cpu_tensors, block_size_factor,
kv_cache_groups_data_refs, gpu_to_cpu):
# 断言校验:GPU 张量在 CUDA 上,CPU 张量在 CPU 上
for gpu_tensor, cpu_tensor in zip(gpu_tensors, cpu_tensors):
assert gpu_tensor.dtype == torch.int8
assert gpu_tensor.ndim == 2
assert gpu_tensor.is_cuda
assert cpu_tensor.dtype == torch.int8
assert cpu_tensor.ndim == 2
assert cpu_tensor.device.type == "cpu"
# CPU 页大小 = GPU 页大小 × block_size_factor
_, gpu_page_size = gpu_tensor.shape
_, cpu_page_size = cpu_tensor.shape
assert cpu_page_size == gpu_page_size * block_size_factor
# 根据方向设置源/目标
self.src_tensors = gpu_tensors if gpu_to_cpu else cpu_tensors
self.dst_tensors = cpu_tensors if gpu_to_cpu else gpu_tensors
self.gpu_to_cpu = gpu_to_cpu
# 块大小因子(方向相关)
# GPU→CPU: src_block_size_factor=1 (GPU块), dst_block_size_factor=block_size_factor (CPU块)
# CPU→GPU: src_block_size_factor=block_size_factor (CPU块), dst_block_size_factor=1 (GPU块)
self.src_block_size_factor = 1 if self.gpu_to_cpu else block_size_factor
self.dst_block_size_factor = block_size_factor if self.gpu_to_cpu else 1
self.transfer_type = ("GPU", "CPU") if self.gpu_to_cpu else ("CPU", "GPU")
# 传输管理数据结构
self._transfer_events: dict[int, torch.Event] = {} # job_id → end_event
self._transfers: deque[Transfer] = deque() # 有序传输队列
self._stream_pool: list[torch.cuda.Stream] = [] # 可复用的 CUDA Stream
self._event_pool: list[torch.Event] = [] # 可复用的 CUDA Event
7.3 transfer_async — 异步传输提交
def transfer_async(self, job_id: int, transfer_spec: TransferSpec) -> bool:
src_spec, dst_spec = transfer_spec
# 提取 GPU 规格信息(无论方向,GPU spec 总是包含 group 信息)
gpu_spec = src_spec if self.gpu_to_cpu else dst_spec
group_sizes = gpu_spec.group_sizes
block_indices = gpu_spec.block_indices
# 计算总 copy 操作数 = sum(group_size × len(group_data_refs))
num_copy_ops = 0
for group_size, group_data_refs in zip(group_sizes, self.kv_cache_groups_data_refs):
num_copy_ops += group_size * len(group_data_refs)
# 预分配数组
all_src = np.empty(num_copy_ops, dtype=np.int64) # 源指针
all_dst = np.empty(num_copy_ops, dtype=np.int64) # 目标指针
all_sizes = np.empty(num_copy_ops, dtype=np.int64) # 每操作的字节数
# 逐组计算指针
src_offset = 0
dst_offset = 0
op_idx = 0
num_transfer_bytes = 0
for group_size, block_idx, group_data_refs in zip(
group_sizes, block_indices, self.kv_cache_groups_data_refs
):
if group_size == 0:
continue
# 计算首块偏移(block_idx % block_size_factor = 子块内偏移)
src_logical_blocks_to_skip = block_idx % self.src_block_size_factor
dst_logical_blocks_to_skip = block_idx % self.dst_block_size_factor
# 扩展逻辑块数(含首块偏移部分)
src_logical_blocks_count = group_size + src_logical_blocks_to_skip
dst_logical_blocks_count = group_size + dst_logical_blocks_to_skip
# 计算实际需要的物理块数(向上取整)
dst_blocks_count = cdiv(dst_logical_blocks_count, self.dst_block_size_factor)
src_blocks_count = cdiv(src_logical_blocks_count, self.src_block_size_factor)
# 提取当前组的块 ID 子集
group_src = src_blocks[src_offset:src_offset + src_blocks_count]
group_dst = dst_blocks[dst_offset:dst_offset + dst_blocks_count]
# 对每个 data_ref(每个 layer/layer 组)计算指针
for data_ref in group_data_refs:
t_idx = data_ref.tensor_idx
end_idx = op_idx + group_size
# 计算源和目标的子块指针
compute_sub_block_ptrs(
group_src, self.src_block_size_factor,
all_src[op_idx:end_idx],
self.src_tensors[t_idx],
skip_count=src_logical_blocks_to_skip,
)
compute_sub_block_ptrs(
group_dst, self.dst_block_size_factor,
all_dst[op_idx:end_idx],
self.dst_tensors[t_idx],
skip_count=dst_logical_blocks_to_skip,
)
all_sizes[op_idx:end_idx] = data_ref.page_size_bytes
num_transfer_bytes += group_size * data_ref.page_size_bytes
op_idx = end_idx
src_offset += src_blocks_count
dst_offset += dst_blocks_count
# 转换为 torch 张量(零拷贝,共享 numpy 内存)
batch_src = torch.from_numpy(all_src)
batch_dst = torch.from_numpy(all_dst)
batch_sizes = torch.from_numpy(all_sizes)
# 从对象池获取或创建新的 Stream 和 Event
stream = self._stream_pool.pop() if self._stream_pool else torch.cuda.Stream()
start_event = self._event_pool.pop() if self._event_pool else torch.Event(enable_timing=True)
end_event = self._event_pool.pop() if self._event_pool else torch.Event(enable_timing=True)
# 设置依赖关系
if self.gpu_to_cpu:
# GPU→CPU:等待当前 CUDA 流(模型计算)完成后再卸载
# 避免传输未完成的计算结果
stream.wait_stream(torch.cuda.current_stream())
if self._transfers:
# 等待上一个传输完成,保证顺序性
last_transfer = self._transfers[-1]
stream.wait_event(last_transfer.end_event)
# 在专用 Stream 上执行批量块拷贝
with torch.cuda.stream(stream):
start_event.record(stream)
if num_copy_ops > 0:
# 核心操作:批量 DMA 传输
# swap_blocks_batch 接收三个数组:src_ptrs, dst_ptrs, sizes
# 每个元素表示一次 memcpy:从 src_ptrs[i] 拷贝 sizes[i] 字节到 dst_ptrs[i]
ops.swap_blocks_batch(batch_src, batch_dst, batch_sizes)
end_event.record(stream)
# 记录传输信息
self._transfer_events[job_id] = end_event
self._transfers.append(
Transfer(job_id=job_id, stream=stream,
start_event=start_event, end_event=end_event,
num_bytes=num_transfer_bytes)
)
return True
CUDA Stream 依赖图:
default_stream (模型计算)
│
│ wait_stream
▼
stream_1 (Transfer 1: GPU→CPU)
│
│ wait_event(end_event_1)
▼
stream_2 (Transfer 2: GPU→CPU)
│
│ wait_event(end_event_2)
▼
stream_3 (Transfer 3: GPU→CPU)
7.4 get_finished — 查询已完成传输
def get_finished(self) -> list[TransferResult]:
results = []
# deque 头部是最早提交的传输
# query() 检查 Event 是否完成(非阻塞)
while self._transfers and self._transfers[0].end_event.query():
transfer = self._transfers.popleft()
# 计算传输耗时(毫秒→秒)
transfer_time = transfer.start_event.elapsed_time(transfer.end_event) * 1e-3
result = TransferResult(
job_id=transfer.job_id,
success=True,
transfer_size=transfer.num_bytes,
transfer_time=transfer_time,
transfer_type=self.transfer_type,
)
results.append(result)
# 回收 Stream 和 Event 到对象池
self._stream_pool.append(transfer.stream)
self._event_pool.append(transfer.end_event)
self._event_pool.append(transfer.start_event)
del self._transfer_events[transfer.job_id]
return results
第八章 cpu/shared_offload_region.py — mmap 共享内存
图 16:mmap 文件创建协调时序图
8.1 SharedOffloadRegion 完整逐行解析
class SharedOffloadRegion:
"""
基于 mmap 的共享内存区域。
核心设计:
1. 使用 /dev/shm 上的文件实现多 Worker 间共享
2. 第一个 Worker (creator) 用 O_EXCL 创建文件并 ftruncate
3. 后续 Worker 打开已存在的文件并等待其达到预期大小
4. 交错布局:每个 block 行包含所有 Worker 的数据
布局示意(2 个 Worker,3 个 block):
Row 0: [Worker0_Block0 | Worker1_Block0]
Row 1: [Worker0_Block1 | Worker1_Block1]
Row 2: [Worker0_Block2 | Worker1_Block2]
row_stride = cpu_page_size * num_workers
每个 Worker 的偏移 = rank * cpu_page_size
"""
def __init__(self, instance_id, total_size_bytes, num_blocks, rank, num_workers, cpu_page_size):
self.page_size = mmap.PAGESIZE
self.total_size_bytes = total_size_bytes
self.mmap_path = f"/dev/shm/vllm_offload_{instance_id}.mmap"
self._creator = False
self.num_blocks = num_blocks
self.rank = rank
self._row_stride = cpu_page_size * num_workers # 行步幅
if rank is not None:
self._worker_offset = rank * cpu_page_size # Worker 在每行中的偏移
self._worker_area_end = (rank + 1) * cpu_page_size # Worker 区域结束
try:
# O_CREAT | O_EXCL:独占创建,只有第一个 Worker 成功
self.fd = os.open(self.mmap_path, os.O_CREAT | os.O_EXCL | os.O_RDWR, 0o600)
os.ftruncate(self.fd, self.total_size_bytes) # 扩展文件到指定大小
self._creator = True
except FileExistsError:
# 后续 Worker 打开已存在的文件
self.fd = os.open(self.mmap_path, os.O_RDWR)
# 忙等待直到文件达到预期大小(creator 尚未完成 ftruncate)
_wait_for_file_size(self.fd, self.total_size_bytes)
# 创建共享 mmap 映射
self.mmap_obj = mmap.mmap(
self.fd, self.total_size_bytes,
flags=mmap.MAP_SHARED, # 共享映射(多进程可见)
prot=mmap.PROT_READ | mmap.PROT_WRITE, # 读写权限
)
# MADV_POPULATE_WRITE:预分配物理页,避免首次访问时缺页中断
# 这对 DMA 传输至关重要——缺页中断会导致传输变慢
_MADV_POPULATE_WRITE = getattr(mmap, "MADV_POPULATE_WRITE", 23)
if rank is not None:
# 只预分配当前 Worker 的页面(节省内存)
for block in range(num_blocks):
raw_offset = block * self._row_stride + worker_offset
# 对齐到页边界
aligned_offset = (raw_offset // page_size) * page_size
end = raw_offset + cpu_page_size
aligned_length = end - aligned_offset
self.mmap_obj.madvise(_MADV_POPULATE_WRITE, aligned_offset, aligned_length)
else:
# 无 rank → 预分配整个区域
self.mmap_obj.madvise(_MADV_POPULATE_WRITE, 0, self.total_size_bytes)
# 将 mmap 转为 torch int8 张量
self._base = torch.frombuffer(memoryview(self.mmap_obj), dtype=torch.int8)
self._views: list[torch.Tensor] = []
self.is_pinned: bool = False # 是否已注册为 CUDA pinned memory
8.2 create_next_view — 创建层视图
def create_next_view(self, tensor_page_size: int) -> torch.Tensor:
"""
为当前 Worker 分配一个层的视图。
每次 create_next_view 调用递增 _worker_offset,
连续的层在内存中是连续排列的(在 Worker 区域内)。
返回 shape = (num_blocks, tensor_page_size), stride = (row_stride, 1) 的 int8 张量
使用 as_strided 创建非连续视图:
- 行步幅 = row_stride(跳过其他 Worker 的数据)
- 列步幅 = 1(int8 单字节)
这样 swap_blocks_batch 可以用行号直接索引每个 block
"""
assert self.rank is not None
new_offset = self._worker_offset + tensor_page_size
assert new_offset <= self._worker_area_end # 不超过 Worker 区域
worker_layer_view = torch.as_strided(
self._base,
size=(self.num_blocks, tensor_page_size),
stride=(self._row_stride, 1), # 行步幅跨越其他 Worker
storage_offset=self._worker_offset, # 从 Worker 偏移开始
)
self._worker_offset = new_offset # 递增偏移供下一层使用
self._views.append(worker_layer_view)
return worker_layer_view
层视图内存布局示例(2 Worker, 2 层, 3 blocks):
/dev/shm/vllm_offload_xxx.mmap
Worker0 区域: Worker1 区域:
┌────────────────┐ ┌────────────────┐
│ W0_L0_Block0 │ │ W1_L0_Block0 │ ← Row 0 (stride = page * 2)
│ W0_L1_Block0 │ │ W1_L1_Block0 │
├────────────────┤ ├────────────────┤
│ W0_L0_Block1 │ │ W1_L0_Block1 │ ← Row 1
│ W0_L1_Block1 │ │ W1_L1_Block1 │
├────────────────┤ ├────────────────┤
│ W0_L0_Block2 │ │ W1_L0_Block2 │ ← Row 2
│ W0_L1_Block2 │ │ W1_L1_Block2 │
└────────────────┘ └────────────────┘
W0_L0 视图: stride = (2 * cpu_page_size, 1), offset = 0
W0_L1 视图: stride = (2 * cpu_page_size, 1), offset = L0_page_size
8.3 cleanup — 资源释放
def cleanup(self) -> None:
# 1. 如果注册了 pinned memory,先取消注册
if self.is_pinned and self._base is not None:
base_ptr = self._base.data_ptr()
result = torch.cuda.cudart().cudaHostUnregister(base_ptr)
if result.value != 0:
logger.warning("cudaHostUnregister failed for rank=%d", self.rank)
self.is_pinned = False
# 2. 释放视图(释放对 _base 的引用,使 StorageImpl 的引用计数归零)
if self._views is not None:
self._views.clear()
# 3. 释放 torch 张量(解除对 mmap buffer 的引用)
self._base = None
# 4. 关闭 mmap 映射
if self.mmap_obj:
self.mmap_obj.close()
self.mmap_obj = None
# 5. 关闭文件描述符
if self.fd is not None:
os.close(self.fd)
self.fd = None
# 6. 只有 creator 才删除文件(避免多 Worker 竞争删除)
if self._creator and self.mmap_path:
os.unlink(self.mmap_path)
self._creator = False
第九章 worker/worker.py — Worker 侧抽象
9.1 OffloadingHandler 抽象
TransferSpec = tuple[LoadStoreSpec, LoadStoreSpec] # (src_spec, dst_spec)
TransferType = tuple[str, str] # (src_medium, dst_medium)
@dataclass
class TransferResult:
job_id: int
success: bool
transfer_size: int | None = None # 传输字节数
transfer_time: float | None = None # 传输耗时(秒)
transfer_type: TransferType | None = None # 传输方向
class OffloadingHandler(ABC):
"""Worker 侧的异步传输处理器抽象"""
@abstractmethod
def transfer_async(self, job_id: int, spec: TransferSpec) -> bool:
"""提交异步传输,返回是否成功"""
@abstractmethod
def get_finished(self) -> list[TransferResult]:
"""获取自上次调用以来完成的传输"""
@abstractmethod
def wait(self, job_ids: set[int]) -> None:
"""阻塞等待指定 job 完成"""
def shutdown(self) -> None:
"""默认空实现"""
9.2 OffloadingWorker — 多 Handler 路由器
class OffloadingWorker:
"""
管理多个 OffloadingHandler,根据传输类型路由到正确的 Handler。
注册机制:
register_handler(GPULoadStoreSpec, CPULoadStoreSpec, gpu_to_cpu_handler)
→ transfer_type = ("GPU", "CPU") → handler
transfer_async 时:
spec = (GPULoadStoreSpec(...), CPULoadStoreSpec(...))
→ src.medium() = "GPU", dst.medium() = "CPU"
→ 查找 ("GPU", "CPU") → gpu_to_cpu_handler
"""
def __init__(self):
self.handlers: set[OffloadingHandler] = set()
self.transfer_type_to_handler: dict[TransferType, OffloadingHandler] = {}
def register_handler(self, src_cls, dst_cls, handler):
transfer_type = (src_cls.medium(), dst_cls.medium())
assert transfer_type not in self.transfer_type_to_handler # 不允许重复注册
self.handlers.add(handler)
self.transfer_type_to_handler[transfer_type] = handler
def transfer_async(self, job_id: int, spec: TransferSpec) -> bool:
src, dst = spec
transfer_type = (src.medium(), dst.medium())
handler = self.transfer_type_to_handler.get(transfer_type)
assert handler is not None
try:
success = handler.transfer_async(job_id, spec)
except Exception as e:
logger.warning("Exception in %r transfer %d: %r", transfer_type, job_id, e)
return False
if not success:
logger.warning("Failed to submit %r transfer %d", transfer_type, job_id)
return success
def get_finished(self) -> list[TransferResult]:
"""从所有 Handler 收集已完成传输"""
finished = []
for handler in self.handlers:
finished.extend(handler.get_finished())
return finished
def wait(self, job_ids: set[int]) -> None:
for handler in self.handlers:
handler.wait(job_ids)
def shutdown(self) -> None:
for handler in self.handlers:
handler.shutdown()
第十章 cpu/common.py — CPULoadStoreSpec
class CPULoadStoreSpec(BlockIDsLoadStoreSpec):
"""CPU 侧加载/存储规格,比 GPU 规格简单——无 group_sizes/block_indices"""
@staticmethod
def medium() -> str:
return "CPU"
CPU 规格更简单的原因:CPU 块是"大块"(block_size_factor 个 GPU 块合并),不存在子块映射问题。CPU 块 ID 直接对应物理内存位置,无需额外的分组信息。
文档二完成。接下来请参阅文档三(架构图/流程图/类图集合)。
更多推荐


所有评论(0)