vllm分析(六)——KV cache offload
vllmkv cache offload的处理流程
KV cache offload到外部存储(比如本机cpu内存),依然使用connector接口。不管是pd分离场景的KV cache 传输还是KV cache offload, KV cache需要有两个过程:store(存储到外部) 和 load(从外部加载)。为了处理load和store,scheduler和worker之间需要传递信息。pd分离场景的kv cache的处理,参考上一篇博客。
OffloadingConnector 整体架构
┌─────────────────────────────────────────────────────────────────┐
│ vLLM Scheduler Process │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ OffloadingConnector (SCHEDULER) │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ OffloadingConnectorScheduler │ │ │
│ │ │ - get_num_new_matched_tokens() → 可加载 token 数 │ │ │
│ │ │ - update_state_after_alloc() → 创建 load 作业 │ │ │
│ │ │ - build_connector_meta() → 构造 store 作业 │ │ │
│ │ │ - update_connector_output() → 处理 worker 完成信息 │ │ │
│ │ │ - _req_status, _jobs, _block_id_to_pending_jobs │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └───────────────────────────┬───────────────────────────────┘ │
└──────────────────────────────┼──────────────────────────────────┘
│ OffloadingConnectorMetadata
│ (load_jobs, store_jobs, jobs_to_flush)
▼
┌─────────────────────────────────────────────────────────────────┐
│ vLLM Worker Process │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ OffloadingConnector (WORKER) │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ OffloadingConnectorWorker │ │ │
│ │ │ - register_kv_caches() → 注册 GPU KV cache │ │ │
│ │ │ - start_kv_transfers() → 提交 load 作业 │ │ │
│ │ │ - prepare_store_kv() → 延迟 store 作业 │ │ │
│ │ │ - get_finished() → 收集完成作业 → 返回元数据 │ │ │
│ │ │ - _load_jobs, _unsubmitted_store_jobs │ │ │
│ │ └───────────────────────────┬─────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌───────────────────────────▼─────────────────────────┐ │ │
│ │ │ OffloadingWorker │ │ │
│ │ │ - 路由 (src_medium, dst_medium) → Handler │ │ │
│ │ │ - transfer_async() / get_finished() / wait() │ │ │
│ │ └───────────────────────────┬─────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌───────────────────────────▼─────────────────────────┐ │ │
│ │ │ SingleDirectionOffloadingHandler (CPU offload) │ │ │
│ │ │ - GPU→CPU 或 CPU→GPU │ │ │
│ │ │ - CUDA stream + events 控制异步拷贝 │ │ │
│ │ │ - swap_blocks_batch 批量传输 │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ KVConnectorOutput
│ (completed jobs)
▼
(回到调度器)
class OffloadingConnector(KVConnectorBase_V1, SupportsHMA):
def __init__(
self,
vllm_config: VllmConfig,
role: KVConnectorRole,
kv_cache_config: KVCacheConfig,
):
super().__init__(vllm_config, role, kv_cache_config)
spec = OffloadingSpecFactory.create_spec(vllm_config, kv_cache_config)
self.connector_scheduler: OffloadingConnectorScheduler | None = None
self.connector_worker: OffloadingConnectorWorker | None = None
if role == KVConnectorRole.SCHEDULER:
self.connector_scheduler = OffloadingConnectorScheduler(spec)
elif role == KVConnectorRole.WORKER:
self.connector_worker = OffloadingConnectorWorker(spec)
涉及的类:
OffloadingConnectorScheduler
OffloadingConnectorWorker
load 作业的处理流程
Scheduler 向 Worker 下发 load作业
┌─────────────────────────────────────────────────────────────────────────────┐
│ Scheduler 侧 (调度器) │
└─────────────────────────────────────────────────────────────────────────────┘
│
Step 1: 调度决策 (schedule)
├─ get_num_new_matched_tokens → 确定需要加载的 token 数
├─ update_state_after_alloc → 创建 load_jobs
└─ build_connector_meta → 封装到 SchedulerOutput.kv_connector_metadata
│
Step 2: 发送 SchedulerOutput 给 Worker (通过执行器)
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Worker 侧 (模型执行前) │
└─────────────────────────────────────────────────────────────────────────────┘
│
Step 3: GPUModelRunner.execute_model 调用 pre_forward
│
┌─────────────────────────────────────────────────────────────┐
│ ActiveKVConnector.pre_forward(scheduler_output) │
│ │
│ 1. 从 scheduler_output 取出 kv_connector_metadata │
│ 2. 调用 self.kv_connector.handle_preemptions(metadata) │
│ 3. 调用 self.kv_connector.bind_connector_metadata(metadata)│
│ 4. 调用 self.kv_connector.start_load_kv(forward_context) │
└─────────────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────────┐
│ OffloadingConnector.start_load_kv (继承自 KVConnectorBase) │
│ 调用 self.connector_worker.start_kv_transfers(metadata) │
│ → 提交所有 load_jobs 到 OffloadingWorker │
└─────────────────────────────────────────────────────────────┘
│
Step 4: OffloadingWorker 路由到 Handler,执行异步传输
(cpu_to_gpu_handler.transfer_async 发起异步拷贝)
│
Step 5: 模型执行 (model forward)
(load 作业在后台传输,不阻塞计算)
start_kv_transfers 中触发 store 和 load 操作,数据传输是异步的。 _unsubmitted_store_jobs存储的是上一轮提交的store任务。
# https://github.com/vllm-project/vllm/blob/v0.21.0/vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py#L89
class OffloadingConnector(KVConnectorBase_V1, SupportsHMA):
def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
assert self.connector_worker is not None
assert isinstance(self._connector_metadata, OffloadingConnectorMetadata)
self.connector_worker.start_kv_transfers(self._connector_metadata)
# https://github.com/vllm-project/vllm/blob/v0.21.0/vllm/distributed/kv_transfer/kv_connector/v1/offloading/worker.py#L295
class OffloadingConnectorWorker:
def start_kv_transfers(self, metadata: OffloadingConnectorMetadata):
for job_id, transfer_spec in self._unsubmitted_store_jobs:
success = self.worker.transfer_async(job_id, transfer_spec)
assert success
self._unsubmitted_store_jobs.clear()
for job_id, entry in metadata.load_jobs.items():
self._load_jobs[job_id] = entry.req_id
success = self.worker.transfer_async(job_id, entry.transfer_spec)
assert success
Worker向Scheduler返回load 任务完成状态
┌─────────────────────────────────────────────────────────────────────────────┐
│ Worker 侧 (模型执行后) │
└─────────────────────────────────────────────────────────────────────────────┘
│
Step 6: GPUModelRunner.execute_model 调用 post_forward
│
┌─────────────────────────────────────────────────────────────┐
│ ActiveKVConnector.post_forward(scheduler_output, │
│ wait_for_save=True) │
│ │
│ 1. 如果 wait_for_save: self.kv_connector.wait_for_save() │
│ 2. 调用 self.kv_connector.get_finished(finished_req_ids) │
│ 3. 获取 get_block_ids_with_load_errors() 等 │
│ 4. 返回 KVConnectorOutput │
└─────────────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────────┐
│ OffloadingConnector.get_finished (worker 侧) │
│ │
│ 1. 调用 self.connector_worker.prepare_store_kv(metadata) │
│ → 将 metadata.store_jobs 添加到 _unsubmitted_store_jobs │
│ (延迟提交 store 作业) │
│ 2. 调用 self.connector_worker.get_finished(finished_req_ids)│
│ → 返回 finished_sending, finished_recving │
│ (finished_recving 包含已完成 load 的请求 ID) │
└─────────────────────────────────────────────────────────────┘
│
Step 7: 返回 KVConnectorOutput 给 Scheduler
(包含 finished_recving、completed_jobs 等)
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Scheduler 侧 (调度器) │
└─────────────────────────────────────────────────────────────────────────────┘
│
Step 8: Scheduler.update_from_output 处理 KVConnectorOutput
│
┌─────────────────────────────────────────────────────────────┐
│ _update_from_kv_xfer_finished(kv_connector_output) │
│ - 调用 connector.update_connector_output() │
│ - 遍历 finished_recving: │
│ 将 req_id 加入 finished_recving_kv_req_ids │
│ - 遍历 finished_sending: 释放块 │
└─────────────────────────────────────────────────────────────┘
│
Step 9: 下一轮 schedule 中提升请求
_try_promote_blocked_waiting_request 检查 finished_recving_kv_req_ids
→ 调用 _update_waiting_for_remote_kv 缓存块,状态改为 WAITING
GPUModelRunner ,框图中分析的调用逻辑是Model Runner V2。相关说明:Model Runner V2:更模块化、更快速的 vLLM 执行核心
要点:
- post_forward 中的 get_finished 会触发 prepare_store_kv,从而将 store 作业存入 _unsubmitted_store_jobs 队列,但此时不启动传输。
- store 作业的实际传输将在下一次 start_kv_transfers(即下一个 step 的 pre_forward)开始时提交,实现与模型计算的重叠。
get_num_new_matched_tokens
get_num_new_matched_tokens(request, num_computed_tokens)
│
├─ 1. 获取或创建 RequestOffloadState
│ - 若新请求,创建并保存到 _req_status
│ - 否则复用现有状态,并清空之前的 block_ids
│
├─ 2. 更新 offload keys(调用 req_status.update_offload_keys())
│ - 基于 request.block_hashes 生成该请求的所有 offload keys
│ - 每个 offload key 标识一个外部存储块
│
├─ 3. 记录本地已计算 token 数
│ req_status.num_locally_computed_tokens = num_computed_tokens
│
├─ 4. 调用 _lookup(req_status) 获取命中 token 数
│
│
├─ 5. 若为新请求,更新命中的 offload 块数
│ req_status.update_num_hit_blocks(num_computed_tokens + (hit_tokens or 0))
│
├─ 6. 调用 _touch(req_status) 更新缓存访问热度
│ - 全注意力组:touch 所有 offload keys
│ - 滑动窗口组:只 touch 最近窗口内的 keys
│
└─ 7. 返回 (hit_tokens, bool(hit_tokens))
_lookup负责实际查询外部缓存中存在且可读的连续块,并返回可加载的 token 总数。
_touch 更新缓存管理器的访问热度, 用于 LRU/ARC 淘汰策略。
LRUCachePolicy : Least Recently Used cache policy
ARCCachePolicy : ARC (Adaptive Replacement Cache) cache policy
store 作业的处理流程
┌─────────────────────────────────────────────────────────────────────────────┐
│ Scheduler 侧 (调度器) │
└─────────────────────────────────────────────────────────────────────────────┘
│
Step 1: 构建 Store 作业 (_build_store_jobs)
│
┌─────────────────────────────────────────────────────────────┐
│ 遍历本轮调度的所有请求: │
│ - 计算 num_offloadable_tokens (本轮后 token 总数) │
│ - 找出需要存储的新 offload 块 (从 next_stored_block_idx) │
│ - 过滤跳过块 (block_id == 0) │
│ - 调用 manager.prepare_store() 处理缓存驱逐和分配 │
│ - 生成 job_id,记录作业状态 (pending_count = world_size) │
│ - 区分滑动窗口块和非滑动窗口块,加入监视集合 │
│ - 返回 store_jobs 字典 {job_id: TransferJob} │
└─────────────────────────────────────────────────────────────┘
│
Step 2: 封装到 ConnectorMetadata (build_connector_meta)
metadata.store_jobs = store_jobs
│
Step 3: 发送给 Worker (通过 SchedulerOutput)
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Worker 侧 (工作器) │
└─────────────────────────────────────────────────────────────────────────────┘
│
Step 4: 预处理阶段 (pre_forward)
handle_preemptions(metadata)
└─ 若有抢占,立即提交 _unsubmitted_store_jobs 中所有作业
(确保数据不丢失)
│
Step 5: 模型前向计算期间
(GPU 生成新 token,store 作业暂未启动)
│
Step 6: 后处理阶段 (post_forward → get_finished)
│
┌─────────────────────────────────────────────────────────────┐
│ OffloadingConnector.get_finished() │
│ │
│ 1. 调用 self.connector_worker.prepare_store_kv(metadata) │
│ → 遍历 metadata.store_jobs │
│ → 将 (job_id, transfer_spec) 追加到 │
│ _unsubmitted_store_jobs 列表 │
│ (仅暂存,不启动传输) │
│ │
│ 2. 调用 self.connector_worker.get_finished() │
│ → 返回已完成传输的作业信息(load 和已完成 store) │
└─────────────────────────────────────────────────────────────┘
│
Step 7: 返回 KVConnectorOutput 给 Scheduler
(包含 completed_jobs、finished_sending 等)
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ 下一个 Step 的 Worker 侧 (延迟提交) │
└─────────────────────────────────────────────────────────────────────────────┘
│
Step 8: 下一轮 pre_forward → start_kv_transfers
│
┌─────────────────────────────────────────────────────────────┐
│ OffloadingConnectorWorker.start_kv_transfers(metadata) │
│ │
│ 1. 首先处理 _unsubmitted_store_jobs 中的延迟作业: │
│ for (job_id, spec) in _unsubmitted_store_jobs: │
│ worker.transfer_async(job_id, spec) │
│ _unsubmitted_store_jobs.clear() │
│ │
│ 2. 再处理本轮新的 load_jobs (如果有) │
└─────────────────────────────────────────────────────────────┘
│
Step 9: OffloadingWorker 路由到 Handler (GPU→CPU)
│
┌─────────────────────────────────────────────────────────────┐
│ gpu_to_cpu_handler.transfer_async(job_id, spec) │
│ - 解析 TransferSpec (src: GPU 块, dst: CPU 块) │
│ - 计算指针,在独立 CUDA stream 上异步拷贝 │
│ - 记录 Transfer 到队列,等待完成 │
└─────────────────────────────────────────────────────────────┘
│
Step 10: 后台异步传输 (与下一轮模型计算重叠)
│
▼
Step 11: 完成查询 (后续的 get_finished 轮询)
handler.get_finished() 检测到 end_event 完成
→ TransferResult 上报给 OffloadingWorker
→ OffloadingConnectorWorker 记录到 _connector_worker_meta
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Scheduler 侧 (调度器) │
└─────────────────────────────────────────────────────────────────────────────┘
│
Step 12: 调度器接收完成信息 (update_connector_output)
- 遍历 completed_jobs,更新 pending_count
- 当 pending_count == 0:
* manager.complete_store() (标记块为 ready)
* 清理 _block_id_to_pending_jobs 中的监视
* 删除作业记录
- 对于 finished_sending 中的 req_id,释放 GPU 块
_build_store_jobs 过滤出需要存储的 offload keys
new_offload_keys: list[OffloadKey] = []
for group_config, group_state in zip(
self.config.kv_group_configs, req_status.group_states
):
num_blocks = num_offloadable_tokens // group_config.offloaded_block_size
start_block_idx = group_state.next_stored_block_idx
if num_blocks <= start_block_idx:
continue
offload_keys = group_state.offload_keys[start_block_idx:num_blocks]
offload_block_ids = group_state.block_ids[
start_block_idx * block_size_factor
+ block_size_factor - 1 : num_blocks * block_size_factor : block_size_factor
]
assert len(offload_keys) == len(offload_block_ids)
for offload_key, block_id in zip(offload_keys, offload_block_ids):
if block_id != 0:
new_offload_keys.append(offload_key)
OffloadingWorker 与 Handler 交互流程
OffloadingConnectorWorker
│
│ register_handler(src_cls, dst_cls, handler)
▼
┌────────────────────────────────────────────────────────────┐
│ OffloadingWorker │
│ transfer_type_to_handler = { │
│ ("GPU", "CPU"): gpu_to_cpu_handler, │
│ ("CPU", "GPU"): cpu_to_gpu_handler, │
│ } │
│ handlers = {handler1, handler2} │
└────────────────────────────────────────────────────────────┘
│
│ transfer_async(job_id, (src_spec, dst_spec))
▼
确定 transfer_type = (src_spec.medium(), dst_spec.medium())
│
▼
查找 handler = transfer_type_to_handler[transfer_type]
│
▼
handler.transfer_async(job_id, spec)
│
▼
┌────────────────────────────────────────────────────────────┐
│ SingleDirectionOffloadingHandler │
│ - 解析 TransferSpec → 计算源/目标指针列表 │
│ - 分配/复用 CUDA stream 和 events │
│ - ops.swap_blocks_batch() 异步拷贝 │
│ - 将 Transfer 加入队列,记录 end_event │
└────────────────────────────────────────────────────────────┘
│
│ (后续轮询) get_finished()
▼
检查队列头部 end_event.query()
├─ 未完成 → 返回空列表
└─ 已完成 → 弹出 Transfer, 构造 TransferResult
(job_id, success, bytes, time, type)
│
▼
返回给 OffloadingWorker → 聚合 → OffloadingConnectorWorker
异步数据传输
提交传输 transfer_async
def transfer_async(self, job_id: int, transfer_spec: TransferSpec) -> bool:
# 1. 解析规格,计算所有需要拷贝的子块(sub-block)
# - 输入:src_block_ids, dst_block_ids, group_sizes, block_indices
# - 输出:flat_src_ptr, flat_dst_ptr, flat_sizes (numpy 数组)
#
# 2. 分配/复用资源
stream = self._stream_pool.pop() if self._stream_pool else torch.cuda.Stream()
start_event = self._event_pool.pop() if ... else torch.Event(enable_timing=True)
end_event = ...
#
# 3. 顺序保证:当前 stream 等待前一个传输的 end_event
if self._transfers:
last_event = self._transfers[-1].end_event
stream.wait_event(last_event)
#
# 4. 启动异步拷贝
with torch.cuda.stream(stream):
start_event.record(stream)
if num_copy_ops > 0:
ops.swap_blocks_batch(
batch_src, batch_dst, batch_sizes,
is_src_access_order_any=(not self.gpu_to_cpu) # CPU→GPU 时允许乱序读
)
end_event.record(stream)
#
# 5. 保存作业
self._transfer_events[job_id] = end_event
self._transfers.append(Transfer(job_id, stream, start_event, end_event, num_bytes))
return True
要点
- 批量传输:swap_blocks_batch 一次 kernel launch 传输所有不连续段,减少开销。
- 顺序串行:通过 stream.wait_event(上一个 end_event) 保证作业按提交顺序执行。
- 访问顺序优化:CPU→GPU 时源是 CPU 内存(不会被 GPU 写),设置 is_src_access_order_any=True 允许 DMA 乱序读,提升带宽。
- 函数内部最终通过 cudaMemcpyAsync(dst_ptr, src_ptr, size, cudaMemcpyDefault, stream) 执行拷贝。cudaMemcpyDefault 会根据 src_ptr 和 dst_ptr 所在的内存类型(CPU 或 GPU)自动推断传输方向,无需额外参数。
- 当使用较新 CUDA(≥12.8)的 cuMemcpyBatchAsync 或 ROCm(≥7.1)的 hipMemcpyBatchAsync 时,同样支持双向传输。对于 CUDA,还通过 is_src_access_order_any 参数优化源端访问顺序:
is_src_access_order_any = true(CPU→GPU 时)允许 DMA 乱序读取源数据,提升带宽。
is_src_access_order_any = false(GPU→CPU 时)要求遵守流顺序,确保数据一致性。
查询完成:get_finished
def get_finished(self) -> list[TransferResult]:
results = []
while self._transfers and self._transfers[0].end_event.query():
transfer = self._transfers.popleft()
# 计算耗时(毫秒→秒)
elapsed_ms = transfer.start_event.elapsed_time(transfer.end_event)
results.append(TransferResult(
job_id=transfer.job_id,
success=True,
transfer_size=transfer.num_bytes,
transfer_time=elapsed_ms * 1e-3,
transfer_type=self.transfer_type, # ("GPU","CPU") or ("CPU","GPU")
))
# 回收资源
self._stream_pool.append(transfer.stream)
self._event_pool.append(transfer.start_event)
self._event_pool.append(transfer.end_event)
del self._transfer_events[transfer.job_id]
return results
非阻塞轮询:通过 event.query() 检查是否完成,不等待。
资源复用:将 stream 和 events 放回池中,避免重复创建。
同步等待:wait
OffloadingConnectorWorker.handle_preemptions()
│
├─ 参数:kv_connector_metadata.jobs_to_flush (set[int])
│
└─ 调用 self.worker.wait(jobs_to_flush)
│
└─ OffloadingWorker.wait(job_ids: set[int])
│
└─ 遍历 self.handlers (每个 OffloadingHandler)
│
└─ 对每个 handler 调用 handler.wait(job_ids)
│
└─ SingleDirectionOffloadingHandler.wait(job_ids)
│
└─ 遍历 job_ids
│
└─ 对每个 job_id:
获取 _transfer_events.get(job_id) → event
if event exists:
event.synchronize() # 阻塞等待事件完成
同步等待用于 flush 场景:需要确保某些作业完成后再释放 GPU 块,通过 event.synchronize() 阻塞 host 直到传输完成。
OffloadingConnectorWorker.handle_preemptions
OffloadingWorker.wait
SingleDirectionOffloadingHandler.wait
CPUOffloadingManager
CPUOffloadingManager负责在 CPU 内存中管理 KV 块的缓存。它维护一个固定容量的块池,使用可插拔的淘汰策略(LRU/ARC),并提供引用计数、事件记录等功能。
lookup 函数
def lookup(self, key: OffloadKey, req_context: ReqContext) -> bool | None:
block = self._policy.get(key)
if block is None:
return False
if not block.is_ready:
return None # write in-flight; caller should retry
return True
查询缓存策略:若 key 不存在返回 False;若存在但 is_ready=False(正在写入,inflight),返回 None 表示延迟;否则返回 True。
prepare_load 函数
仅用于需要读取的块。增加引用计数,防止在读取期间被驱逐。
返回 CPULoadStoreSpec,包含 CPU 块 ID 列表,供工作器执行实际拷贝。
prepare_store 函数
为新块分配block,可能触发淘汰。
touch 函数
通知策略这些 key 被访问了,用于更新 LRU/ARC 的访问顺序,但不增加引用计数。
complete_load 函数
加载完成后减少引用计数。当引用计数为 0 时,该块可能在未来被淘汰。
多级缓存 TieringOffloadingManager
TieringOffloadingManager: Multi-tier KV cache offloading orchestrator.
This manager coordinates between a CPU primary tier (with direct GPU access)
and zero or more secondary tiers (Storage, Network, etc.) to provide
hierarchical KV cache offloading.
TieringOffloadingManager 是在v0.21.1rc中发布的。
SecondaryTierManager 定义了接口。ExampleSecondaryTierManager还处于demo状态,不具备实际能力。
Reference
[1] 提速30%:vLLM推理的Swap特性实践
[2] vllm分析(五)——pd分离kv cache的处理过程
[3] Multi-tier KV offloading via the vLLM offloading connector
[4] Inside vLLM’s New KV Offloading Connector: Smarter Memory Transfer for Maximizing Inference Throughput
[5] [RFC]: KV cache offloading
[6] [RFC]: Progressive KV Cache CPU Onloading
更多推荐

所有评论(0)