【vllm】(v1 Engine)vLLM V1 Engine 深度分析 — Part1: Core引擎与数据定义
vLLM V1 Engine 深度分析 — Part1: Core引擎与数据定义
vLLM V1 Engine 深度分析 — Part1: Core引擎与数据定义
分析版本:基于 vLLM v1/engine 模块源码
分析范围:__init__.py(数据定义)+core.py(核心引擎)
分析深度:逐行级别
一、模块定位
1.1 模块架构总览
vllm/v1/engine/ 是 vLLM v1 架构的核心引擎层,承上启下:
┌─────────────────────────────────────────┐
│ API Server / Frontend (LLMEngine) │ ← 对外接口
├─────────────────────────────────────────┤
│ EngineCoreClient │ ← IPC 客户端
├─────────────────────────────────────────┤
│ ★ EngineCore / EngineCoreProc ★ │ ← 本模块
│ (调度 + 执行 + 输出) │
├─────────────────────────────────────────┤
│ Scheduler │ ← 请求调度
│ ModelExecutor │ ← 模型执行
│ KV Cache Manager │ ← 缓存管理
└─────────────────────────────────────────┘
核心职责:
- 数据协议定义:
__init__.py定义了引擎与前端之间的所有通信数据结构 - 调度-执行循环:
core.py的EngineCore实现了schedule → execute → update的主循环 - 进程管理:
EngineCoreProc将引擎包裹在子进程中,通过 ZMQ 通信 - 数据并行:
DPEngineCoreProc扩展了 MoE 模型的数据并行能力 - Actor 模式:
EngineCoreActor/DPMoEEngineCoreActor提供 Ray Actor 封装
1.2 文件结构
| 文件 | 行数 | 核心内容 |
|---|---|---|
__init__.py |
269 | 数据定义:枚举、Struct、请求/输出格式 |
core.py |
2145 | EngineCore、EngineCoreProc、DPEngineCoreProc、Actor类 |
二、init.py 逐行分析
2.1 导入区(1-17行)
import enum
import time
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Any, Literal
import msgspec
import numpy as np
import torch
逐行解析:
| 行 | 导入 | 用途 |
|---|---|---|
| 1-2 | enum, time |
枚举基类 + 时间戳 |
| 3 | Mapping |
类型注解:键值映射(用于trace_headers等) |
| 4 | dataclass |
Python 数据类装饰器(EngineCoreReadyResponse) |
| 5 | Any, Literal |
Any=任意类型;Literal=字面量类型约束 |
| 7 | msgspec |
关键依赖:高性能 msgpack 序列化库,替代 pydantic/dataclass |
| 8 | numpy |
routed_experts 字段的 np.ndarray |
| 9 | torch |
prompt_embeds 和 pooling_output 的 Tensor |
from vllm.lora.request import LoRARequest
from vllm.multimodal.inputs import MultiModalFeatureSpec
from vllm.pooling_params import PoolingParams
from vllm.sampling_params import SamplingParams
from vllm.v1.metrics.stats import PrefillStats, SchedulerStats
from vllm.v1.outputs import LogprobsLists, LogprobsTensors
from vllm.v1.serial_utils import UtilityResult
| 导入 | 用途 |
|---|---|
LoRARequest |
LoRA 适配器请求(动态加载微调权重) |
MultiModalFeatureSpec |
多模态特征规格(图像/音频/视频嵌入) |
PoolingParams |
池化参数(embedding 模型专用,如 E5/BGE) |
SamplingParams |
采样参数(temperature, top_p, top_k 等) |
PrefillStats |
Prefill 阶段统计信息 |
SchedulerStats |
调度器统计信息 |
LogprobsLists / LogprobsTensors |
对数概率输出(列表/张量两种形式) |
UtilityResult |
通用工具调用结果 |
2.2 类型别名与常量(19-26行)
# Type for pause_generation mode parameter.
# - "abort": Abort all in-flight requests immediately (default).
# - "wait": Wait for in-flight requests to complete before pausing.
# - "keep": Freeze requests in queue; they resume on resume_generation().
PauseMode = Literal["abort", "wait", "keep"]
PauseMode — 引擎暂停模式的三选一类型:
"abort":立即中止所有在途请求(默认模式,最激进)"wait":等待在途请求完成后再暂停(最温和,仅 EngineCoreProc 支持)"keep":冻结队列中的请求,恢复时继续处理(中间方案)
# These are possible values of RequestOutput.finish_reason,
# so form part of the external API.
FINISH_REASON_STRINGS = ("stop", "length", "abort", "error", "repetition")
FINISH_REASON_STRINGS — 完成原因的字符串表示元组,属于外部 API 契约:
"stop":遇到停止字符串"length":达到最大 token 数"abort":客户端中止"error":内部可重试错误(始终映射为 500)"repetition":重复模式检测(幻觉检测)
EEP_NOTIFICATION_CALL_ID = -1
EEP_NOTIFICATION_CALL_ID — Elastic EP(弹性专家并行)通知的固定 call_id。-1 是保留值,表示这不是普通 utility 调用,而是引擎内部通知,不需要等待响应。
2.3 EEPNotificationType 枚举(28-34行)
class EEPNotificationType(enum.Enum):
NEW_CORE_ENGINES_INIT_READY = "NEW_CORE_ENGINES_INIT_READY"
NEW_CORE_ENGINES_WEIGHTS_INIT_READY = "NEW_CORE_ENGINES_WEIGHTS_INIT_READY"
RECONFIGURE_FINISHED = "RECONFIGURE_FINISHED"
SHUTDOWN_COMPLETE = "SHUTDOWN_COMPLETE"
EEP(Elastic Expert Parallelism)通知类型:
| 值 | 含义 | 触发者 | 接收者 |
|---|---|---|---|
NEW_CORE_ENGINES_INIT_READY |
新引擎核心初始化完成 | 新加入的引擎 | 已有引擎 |
NEW_CORE_ENGINES_WEIGHTS_INIT_READY |
新引擎权重加载完成 | 新加入的引擎 | EngineCoreClient |
RECONFIGURE_FINISHED |
重配置完成 | 已有引擎 | EngineCoreClient |
SHUTDOWN_COMPLETE |
缩容引擎关闭完成 | 退出的引擎 | EngineCoreClient |
这是 MoE 模型弹性缩扩容的协调协议。扩容时新引擎通知老引擎"我准备好了";缩容时退出引擎通知客户端释放资源。
2.4 FinishReason 枚举(37-58行)
class FinishReason(enum.IntEnum):
"""
Reason a request finished - stop, length, abort, error, or repetition.
Int rather than Str for more compact serialization.
stop - a stop string was emitted
length - max_tokens was consumed, or max_model_len was reached
abort - aborted by client
error - retryable request-level internal error (e.g., KV load failure).
Invariant: always converted to 500 Internal Server Error.
repetition - repetitive token pattern detected (hallucination)
"""
STOP = 0
LENGTH = 1
ABORT = 2
ERROR = 3
REPETITION = 4
def __str__(self):
return FINISH_REASON_STRINGS[self.value]
设计要点:
- IntEnum 而非 StrEnum:整数比字符串序列化更紧凑(msgpack 中 1 byte vs 多字节),在每请求输出的高频场景下节省带宽
__str__方法:通过FINISH_REASON_STRINGS[self.value]索引映射,保持对外 API 的字符串接口不变- REPETITION = 4:幻觉检测——当模型陷入重复 token 循环时终止生成,这是 vLLM 的独特特性
2.5 EngineCoreReadyResponse(60-68行)
@dataclass
class EngineCoreReadyResponse:
"""Sent from EngineCore to each frontend at the end of engine startup.
Contains post-initialization config that may differ from the original
values (e.g. max_model_len after KV cache auto-fitting).
"""
max_model_len: int
num_gpu_blocks: int
dp_stats_address: str | None
启动就绪响应 — 引擎初始化完成后发送给每个前端:
| 字段 | 类型 | 含义 |
|---|---|---|
max_model_len |
int |
最终的最大模型长度(可能因 KV cache 自动适配而缩小) |
num_gpu_blocks |
int |
GPU 上的 KV cache 块数 |
dp_stats_address |
str | None |
数据并行的统计发布地址(仅 DP>1 时有效) |
为什么用 dataclass 而非 msgspec.Struct? 因为这个响应只在启动时发送一次,不走高频 IPC 通道,性能无关紧要。
2.6 EngineCoreRequest(70-126行)
class EngineCoreRequest(
msgspec.Struct,
array_like=True, # type: ignore[call-arg]
omit_defaults=True, # type: ignore[call-arg]
gc=False,
): # type: ignore[call-arg]
msgspec.Struct 装饰器参数解析:
| 参数 | 值 | 含义 |
|---|---|---|
array_like=True |
启用 | 允许像数组一样索引结构体字段,优化序列化为数组而非 map |
omit_defaults=True |
启用 | 序列化时省略默认值字段,减少传输量 |
gc=False |
禁用 | 不追踪循环引用(Struct 无循环引用),加速 GC |
字段逐一解析:
request_id: str
请求唯一标识符,由 InputProcessor.assign_request_id() 分配。
prompt_token_ids: list[int] | None
输入 token ID 列表。None 表示纯嵌入请求(只有 prompt_embeds,无 token)。
mm_features: list[MultiModalFeatureSpec] | None
多模态特征列表。图像/音频/视频等输入会被预处理为特征规格。
sampling_params: SamplingParams | None
pooling_params: PoolingParams | None
采样参数或池化参数,二选一。采样用于生成模型,池化用于 embedding 模型。
arrival_time: float
请求到达时间(单调时钟),用于调度优先级和统计。
lora_request: LoRARequest | None
LoRA 请求——如果请求需要使用特定的 LoRA 适配器。
cache_salt: str | None
缓存盐值——用于 prefix caching 的确定性哈希,防止哈希碰撞。
data_parallel_rank: int | None
数据并行排名——指定该请求应该由哪个 DP rank 处理。
prompt_embeds: torch.Tensor | None = None
预计算的 prompt 嵌入张量。当使用 embedding API 或混合模式时提供。
prompt_is_token_ids: list[bool] | None = None
混合模式掩码:每个位置标记是真实 token(True)还是预计算嵌入(False)。
- 纯 token 请求:
None - 纯嵌入请求:
None - 混合请求:如
[True, True, False, False, True]表示前2个是token,中间2个是嵌入,最后1个是token
client_index: int = 0
客户端索引——当多个前端连接同一引擎时,确保输出路由回正确的客户端。
current_wave: int = 0
DP wave 编号——在数据并行场景下,请求预期属于哪一波调度。解决请求在 wave 完成通知之前到达的竞态条件。
priority: int = 0
请求优先级(0=默认,越大越优先)。
trace_headers: Mapping[str, str] | None
分布式追踪头(OpenTelemetry 等),用于端到端追踪。
resumable: bool = False
是否可恢复——标记该请求在 preemption 后可以恢复而非重算。
external_req_id: str | None = None
用户提供的原始请求 ID。request_id 是内部 ID,external_req_id 是用户 ID。支持 abort(req_id, internal=False) 场景。
reasoning_ended: bool | None = None
推理/思考阶段是否已结束(用于 o1 类模型的 reasoning 模式)。
reasoning_parser_kwargs: dict[str, Any] | None = None
推理解析器的额外参数(用于定制 reasoning token 的解析逻辑)。
params 属性:
@property
def params(self) -> SamplingParams | PoolingParams:
"""Return the processed params (sampling or pooling)."""
if self.sampling_params is not None:
return self.sampling_params
assert self.pooling_params is not None
return self.pooling_params
统一访问参数的便捷属性:优先返回 sampling_params,否则断言返回 pooling_params。
2.7 EngineCoreEventType 和 EngineCoreEvent(128-152行)
class EngineCoreEventType(enum.IntEnum):
"""The type of engine core request event."""
QUEUED = 1
SCHEDULED = 2
PREEMPTED = 3
事件类型:请求在引擎核心中的生命周期事件:
QUEUED=1:请求进入队列SCHEDULED=2:请求被调度执行PREEMPTED=3:请求被抢占(KV cache 不够时,低优先级请求让位)
class EngineCoreEvent(msgspec.Struct):
"""A timestamped engine core event associated with a request."""
type: EngineCoreEventType
timestamp: float
@classmethod
def new_event(
cls, event_type: EngineCoreEventType, timestamp: float | None = None
) -> "EngineCoreEvent":
timestamp = time.monotonic() if timestamp is None else timestamp
return cls(event_type, timestamp)
设计要点:
- 单调时钟(
time.monotonic):不受系统时间调整影响,适合测量间隔 - 工厂方法:
new_event()提供默认时间戳,简化调用 - 进程内有效:注释明确说明不应跨进程比较时间戳
2.8 EngineCoreOutput(154-187行)
class EngineCoreOutput(
msgspec.Struct,
array_like=True,
omit_defaults=True,
gc=False,
):
request_id: str
new_token_ids: list[int]
核心输出结构 — 每个请求每步的输出:
| 字段 | 类型 | 含义 |
|---|---|---|
request_id |
str |
对应请求的 ID |
new_token_ids |
list[int] |
本次新生成的 token ID 列表(可能多个,如 spec decode) |
new_logprobs: LogprobsLists | None = None
new_prompt_logprobs_tensors: LogprobsTensors | None = None
new_logprobs:新 token 的对数概率(列表形式,易 Python 处理)new_prompt_logprobs_tensors:prompt token 的对数概率(张量形式,高效)
pooling_output: torch.Tensor | None = None
池化输出——embedding 模型的输出向量。
finish_reason: FinishReason | None = None
stop_reason: int | str | None = None
finish_reason:完成原因枚举(None 表示未完成)stop_reason:具体停止原因——stop token 的 ID(int)或字符串
events: list[EngineCoreEvent] | None = None
请求生命周期事件列表(用于可观测性/调试)。
kv_transfer_params: dict[str, Any] | None = None
KV 传输参数——用于跨引擎的 KV cache 迁移(disaggregated prefill)。
trace_headers: Mapping[str, str] | None = None
追踪头——透传请求的追踪信息。
prefill_stats: PrefillStats | None = None
Prefill 统计信息——仅在 prefill 完成步有值。
routed_experts: np.ndarray | None = None
路由专家索引——MoE 模型每步激活的专家 ID 数组(用于分析/调试)。
num_nans_in_logits: int = 0
logits 中的 NaN 数量。大于0意味着输出已损坏,这是关键的健康指标。
@property
def finished(self) -> bool:
return self.finish_reason is not None
finished 属性:仅当 finish_reason 不为 None 时返回 True。
2.9 UtilityOutput(189-197行)
class UtilityOutput(
msgspec.Struct,
array_like=True,
gc=False,
):
call_id: int
failure_message: str | None = None
result: UtilityResult | None = None
工具调用输出 — 用于引擎与前端之间的通用 RPC 调用:
| 字段 | 含义 |
|---|---|
call_id |
调用 ID,匹配请求与响应 |
failure_message |
非 None 表示调用失败 |
result |
调用结果(成功时) |
设计模式: failure_message 和 result 互斥——要么成功有结果,要么失败有消息。
2.10 EngineCoreOutputs(199-236行)
class EngineCoreOutputs(
msgspec.Struct,
array_like=True,
omit_defaults=True,
gc=False,
):
engine_index: int = 0
outputs: list[EngineCoreOutput] = []
scheduler_stats: SchedulerStats | None = None
timestamp: float = 0.0
utility_output: UtilityOutput | None = None
finished_requests: set[str] | None = None
wave_complete: int | None = None
start_wave: int | None = None
def __post_init__(self):
if self.timestamp == 0.0:
self.timestamp = time.monotonic()
引擎核心输出集合 — 一次 step 的所有输出:
| 字段 | 含义 |
|---|---|
engine_index |
引擎索引(DP 场景下区分不同引擎) |
outputs |
每个请求的输出列表 |
scheduler_stats |
调度器统计 |
timestamp |
时间戳(__post_init__ 自动填充单调时钟) |
utility_output |
工具调用响应(如果本次 step 有 utility 调用) |
finished_requests |
已完成的请求 ID 集合(用于前端清理状态) |
wave_complete |
DP wave 完成信号 |
start_wave |
请求属于旧 wave,需要启动新 wave |
注意 __post_init__:msgspec.Struct 也支持这个 dataclass 风格的钩子,在构造后自动设置时间戳。
2.11 EngineCoreRequestType 枚举(238-249行)
class EngineCoreRequestType(enum.Enum):
"""Request types defined as hex byte strings, so it can be sent over
sockets without separate encoding step."""
ADD = b"\x00"
ABORT = b"\x01"
START_DP_WAVE = b"\x02"
UTILITY = b"\x03"
EXECUTOR_FAILED = b"\x04"
WAKEUP = b"\x05"
ZMQ 帧协议类型 — 用单字节编码,零开销解析:
| 值 | 类型 | 含义 |
|---|---|---|
b"\x00" |
ADD | 添加新请求 |
b"\x01" |
ABORT | 中止请求 |
b"\x02" |
START_DP_WAVE | 启动新的 DP wave |
b"\x03" |
UTILITY | 通用 RPC 调用 |
b"\x04" |
EXECUTOR_FAILED | 执行器失败信号(内部哨兵) |
b"\x05" |
WAKEUP | 唤醒空闲引擎(关闭时用) |
为什么用字节而非整数? ZMQ 的多帧消息中,第一帧直接用这个字节作为类型标识符,无需额外编码/解码步骤。
2.12 ReconfigureDistributedRequest(251-261行)
class ReconfigureDistributedRequest(msgspec.Struct):
new_data_parallel_size: int
new_data_parallel_rank: int
new_data_parallel_rank_local: int
new_data_parallel_master_ip: str
new_data_parallel_master_port: int
new_data_parallel_master_port_list: list[int]
coord_store_port: int
分布式重配置请求 — Elastic EP 扩缩容时的配置更新:
| 字段 | 含义 |
|---|---|
new_data_parallel_size |
新的 DP 大小 |
new_data_parallel_rank |
新的 DP 全局排名 |
new_data_parallel_rank_local |
新的 DP 本地排名 |
new_data_parallel_master_ip |
新的 DP master IP |
new_data_parallel_master_port |
新的 DP master 端口 |
new_data_parallel_master_port_list |
所有 DP rank 的端口列表 |
coord_store_port |
协调器存储端口 |
2.13 ReconfigureRankType 枚举(263-269行)
class ReconfigureRankType(enum.IntEnum):
"""Rank type for reconfiguring distributed request."""
KEEP_CURRENT_RANK = -1
SHUTDOWN_CURRENT_RANK = -2
| 值 | 含义 |
|---|---|
KEEP_CURRENT_RANK = -1 |
保持当前排名不变 |
SHUTDOWN_CURRENT_RANK = -2 |
关闭当前排名(缩容时退出) |
负值保留——避免与正常 DP rank(≥0)混淆。
三、core.py 逐行分析
3.1 导入区(1-57行)
import gc
import os
import queue
import signal
import threading
import time
from collections import defaultdict, deque
from collections.abc import Callable, Generator
from concurrent.futures import Future
from contextlib import ExitStack, contextmanager
from enum import IntEnum
from functools import partial
from inspect import isclass, signature
from logging import DEBUG
from multiprocessing.queues import Queue
from typing import Any, TypeVar, cast
| 导入 | 用途 |
|---|---|
gc |
GC 冻结/解冻(freeze_gc_heap / gc.unfreeze) |
queue |
线程间通信队列(input_queue, aborts_queue) |
signal |
SIGTERM/SIGINT 处理 |
threading |
I/O 线程管理 |
deque |
批次队列(有界双端队列) |
Future |
异步结果 |
ExitStack |
多个上下文管理器的统一管理 |
IntEnum |
EngineShutdownState |
partial |
部分函数应用(idle_state_callbacks) |
isclass, signature |
运行时类型检查(msgspec 参数转换) |
Queue |
多进程张量共享队列 |
import msgspec
import zmq
zmq(ZeroMQ):引擎与前端之间的 IPC 传输层。选择 ZMQ 的原因:
- 多种模式(DEALER/ROUTER/PUSH/PULL/XSUB/XPUB)
- 零拷贝消息传递
- 自动重连
- 跨进程/跨网络
import vllm.envs as envs
from vllm.config import ParallelConfig, VllmConfig
from vllm.distributed import stateless_destroy_torch_distributed_process_group
from vllm.envs import enable_envs_cache
from vllm.logger import init_logger
from vllm.logging_utils.dump_input import dump_engine_exception
from vllm.lora.request import LoRARequest
from vllm.multimodal import MULTIMODAL_REGISTRY
from vllm.tasks import POOLING_TASKS, SupportedTask
from vllm.tracing import instrument, maybe_init_worker_tracer
from vllm.transformers_utils.config import maybe_register_config_serialize_by_value
from vllm.utils import numa_utils
from vllm.utils.gc_utils import freeze_gc_heap, maybe_attach_gc_debug_callback
from vllm.utils.hashing import get_hash_fn_by_name
from vllm.utils.network_utils import make_zmq_socket
from vllm.utils.system_utils import decorate_logs, set_process_title
| 导入 | 用途 |
|---|---|
VllmConfig |
全局配置对象 |
stateless_destroy_torch_distributed_process_group |
无状态销毁 DP 进程组 |
dump_engine_exception |
异常时转储调度器输入 |
MULTIMODAL_REGISTRY |
多模态处理器注册表 |
POOLING_TASKS / SupportedTask |
任务类型定义 |
instrument |
分布式追踪装饰器 |
maybe_init_worker_tracer |
初始化 worker 追踪器 |
freeze_gc_heap |
冻结 GC 堆(减少启动后 GC 压力) |
get_hash_fn_by_name |
按名称获取哈希函数(prefix caching) |
make_zmq_socket |
ZMQ socket 工厂 |
set_process_title |
设置进程标题(便于监控) |
from vllm.v1.core.kv_cache_utils import (
BlockHash,
generate_scheduler_kv_cache_config,
get_kv_cache_configs,
get_request_block_hasher,
init_none_hash,
resolve_kv_cache_block_sizes,
)
from vllm.v1.core.sched.interface import PauseState, SchedulerInterface
from vllm.v1.core.sched.output import SchedulerOutput
from vllm.v1.engine import (...)
from vllm.v1.engine.tensor_ipc import TensorIpcReceiver
from vllm.v1.engine.utils import (
EngineHandshakeMetadata,
EngineZmqAddresses,
SignalCallback,
get_device_indices,
)
from vllm.v1.executor import Executor
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.metrics.stats import SchedulerStats
from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.request import Request, RequestStatus
from vllm.v1.serial_utils import MsgpackDecoder, MsgpackEncoder
from vllm.v1.structured_output import StructuredOutputManager
from vllm.v1.utils import compute_iteration_details
from vllm.version import __version__ as VLLM_VERSION
关键依赖链:
SchedulerInterface→ 调度器抽象接口Executor→ 模型执行器抽象接口Request / RequestStatus→ 内部请求表示MsgpackDecoder / MsgpackEncoder→ 高性能序列化StructuredOutputManager→ 约束输出(JSON schema 等)TensorIpcReceiver→ 进程间张量共享
logger = init_logger(__name__)
HANDSHAKE_TIMEOUT_MINS = 5
_R = TypeVar("_R")
HANDSHAKE_TIMEOUT_MINS = 5:握手超时5分钟_R:collective_rpc的返回类型变量
3.2 EngineCore 类 — 核心调度执行循环
3.2.1 __init__ 方法(61-155行)
伪代码概览:
EngineCore.__init__(vllm_config, executor_class, log_stats, ...):
1. 加载插件
2. 创建模型执行器
3. [EEP] 弹性扩容预处理
4. 初始化 KV cache(包含内存 profiling)
5. 创建结构化输出管理器
6. 创建调度器
7. [如果KV connector] 收集握手元数据
8. 创建多模态接收器缓存
9. [Pipeline Parallel] 初始化批次队列
10. 初始化请求块哈希器
11. 选择 step 函数(普通/批次队列)
12. 冻结 GC 堆,启用环境变量缓存
逐段分析:
def __init__(
self,
vllm_config: VllmConfig,
executor_class: type[Executor],
log_stats: bool,
executor_fail_callback: Callable | None = None,
include_finished_set: bool = False,
):
| 参数 | 含义 |
|---|---|
vllm_config |
全局配置 |
executor_class |
执行器类(非实例,延迟创建) |
log_stats |
是否记录统计信息 |
executor_fail_callback |
执行器失败回调 |
include_finished_set |
输出中是否包含已完成请求集合 |
from vllm.plugins import load_general_plugins
load_general_plugins()
插件加载:在引擎/调度器层面加载通用插件,允许第三方扩展引擎行为。
self.vllm_config = vllm_config
if not vllm_config.parallel_config.data_parallel_rank_local:
logger.info(
"Initializing a V1 LLM engine (v%s) with config: %s",
VLLM_VERSION,
vllm_config,
)
仅在非 DP 本地排名(即 rank 0)时打印初始化日志,避免 DP 场景下重复日志。
self.log_stats = log_stats
self.model_executor = executor_class(vllm_config)
if executor_fail_callback is not None:
self.model_executor.register_failure_callback(executor_fail_callback)
创建执行器并注册失败回调:执行器创建后会进行模型加载等初始化。
self.available_gpu_memory_for_kv_cache = -1
if envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
self._eep_scale_up_before_kv_init()
EEP 扩容预处理:在 KV cache 初始化之前,如果正在弹性扩容,先执行预处理(基类中 raise NotImplementedError,DPEngineCoreProc 覆写)。
kv_cache_config = self._initialize_kv_caches(vllm_config)
self.structured_output_manager = StructuredOutputManager(vllm_config)
初始化 KV cache 和结构化输出管理器:KV cache 初始化是最关键的步骤(详见下文)。
Scheduler = vllm_config.scheduler_config.get_scheduler_cls()
动态获取调度器类:根据配置选择不同的调度策略(如 ChunkedScheduler、Scheduler 等)。
if len(kv_cache_config.kv_cache_groups) == 0:
if vllm_config.scheduler_config.enable_chunked_prefill:
logger.warning("Disabling chunked prefill for model without KVCache")
vllm_config.scheduler_config.enable_chunked_prefill = False
无 KV cache 模型:某些 encoder-only 模型不需要 KV cache,此时必须禁用 chunked prefill(因为 chunked prefill 依赖 KV cache 块管理)。
scheduler_block_size, hash_block_size = resolve_kv_cache_block_sizes(
kv_cache_config, vllm_config
)
self.scheduler: SchedulerInterface = Scheduler(
vllm_config=vllm_config,
kv_cache_config=kv_cache_config,
structured_output_manager=self.structured_output_manager,
include_finished_set=include_finished_set,
log_stats=self.log_stats,
block_size=scheduler_block_size,
hash_block_size=hash_block_size,
)
创建调度器:
scheduler_block_size:调度器使用的块大小(决定 prefill 切分粒度)hash_block_size:哈希块大小(用于 prefix caching 的块级哈希)
self.use_spec_decode = vllm_config.speculative_config is not None
if self.scheduler.connector is not None:
self.model_executor.init_kv_output_aggregator(self.scheduler.connector)
投机解码与 KV connector:
- 投机解码:使用小模型快速生成 draft tokens,再由大模型验证
- KV connector:如果调度器有 KV 连接器(跨引擎 KV cache 共享),初始化输出聚合器
mm_registry = MULTIMODAL_REGISTRY
self.mm_receiver_cache = mm_registry.engine_receiver_cache_from_config(
vllm_config
)
多模态接收器缓存:预分配多模态特征处理的缓存,避免运行时重复计算。
kv_connector = self.scheduler.get_kv_connector()
if kv_connector is not None:
xfer_handshake_metadata = (
self.model_executor.get_kv_connector_handshake_metadata()
)
if xfer_handshake_metadata:
content: dict[int, Any] = {}
for worker_dict in xfer_handshake_metadata:
if worker_dict is not None:
content.update(worker_dict)
kv_connector.set_xfer_handshake_metadata(content)
KV 传输握手:
- 从所有 worker 收集 KV 连接器的传输元数据
- 每个 worker 返回
{tp_rank: metadata}格式的字典 - 合并所有 worker 的元数据到一个统一字典
- 设置到 KV connector,使其拥有完整的跨 worker 上下文
self.batch_queue_size = self.model_executor.max_concurrent_batches
self.batch_queue: (
deque[tuple[Future[ModelRunnerOutput], SchedulerOutput, Future[Any]]] | None
) = None
if self.batch_queue_size > 1:
logger.debug("Batch queue is enabled with size %d", self.batch_queue_size)
self.batch_queue = deque(maxlen=self.batch_queue_size)
批次队列 — Pipeline Parallelism 的关键优化:
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Schedule │ → │ Execute │ → │ Update │
│ Batch N+2 │ │ Batch N+1 │ │ Batch N │
└───────────┘ └───────────┘ └───────────┘
当 max_concurrent_batches > 1 时,可以重叠调度和执行,消除流水线气泡。队列中的元素是 (sample_future, scheduler_output, exec_future) 三元组。
self.is_ec_consumer = (
vllm_config.ec_transfer_config is None
or vllm_config.ec_transfer_config.is_ec_consumer
)
self.is_pooling_model = vllm_config.model_config.runner_type == "pooling"
is_ec_consumer:是否是 Encoder-Consumer 模式(disaggregated prefill 中的消费端)is_pooling_model:是否是池化/embedding 模型
self.request_block_hasher: Callable[[Request], list[BlockHash]] | None = None
if vllm_config.cache_config.enable_prefix_caching or kv_connector is not None:
caching_hash_fn = get_hash_fn_by_name(
vllm_config.cache_config.prefix_caching_hash_algo
)
init_none_hash(caching_hash_fn)
self.request_block_hasher = get_request_block_hasher(
hash_block_size, caching_hash_fn
)
请求块哈希器 — Prefix Caching 的核心:
- 获取配置的哈希函数(如 xxhash, sha256)
- 初始化
none_hash(空块的预计算哈希值) - 创建
request_block_hasher:将请求的 token 序列映射为块哈希列表
当 prefix caching 启用时,相同前缀的请求可以共享 KV cache 块,避免重复计算。
self.step_fn = (
self.step if self.batch_queue is None else self.step_with_batch_queue
)
self.async_scheduling = vllm_config.scheduler_config.async_scheduling
选择 step 函数:
- 无批次队列:
step()— 同步调度执行 - 有批次队列:
step_with_batch_queue()— 异步重叠调度执行
self.aborts_queue = queue.Queue[list[str]]()
self._idle_state_callbacks: list[Callable] = []
aborts_queue:线程安全的 abort 请求队列(input socket 线程 → 核心循环)_idle_state_callbacks:引擎空闲时的回调列表(用于 pause/wait 逻辑)
freeze_gc_heap()
maybe_attach_gc_debug_callback()
enable_envs_cache()
启动后优化三件套:
- 冻结 GC 堆:将启动期间分配的对象(模型权重、KV cache)标记为静态,减少 GC 扫描时间
- 附加 GC 调试回调:如果启用,在 GC 时打印额外调试信息
- 启用环境变量缓存:启动后不再有环境变量变更,缓存
os.getenv()结果
3.2.2 _initialize_kv_caches 方法(157-210行)
伪代码:
_initialize_kv_caches(vllm_config):
1. 获取 KV cache 规格列表
2. [有 KV cache]
a. [EEP] 使用已有的 GPU 内存信息
b. [非 EEP] 执行内存 profiling → 确定 GPU 可用内存
3. [无 KV cache] available = 0
4. 计算每个 KV cache 组的配置(块数、大小)
5. [自动适配] 如果 max_model_len 被缩小,同步到 workers
6. 生成调度器 KV cache 配置
7. 更新 vllm_config 的 cache_config
8. 初始化 KV cache 并 warmup
9. 返回调度器 KV cache 配置
@instrument(span_name="Prepare model")
def _initialize_kv_caches(self, vllm_config: VllmConfig) -> KVCacheConfig:
start = time.time()
kv_cache_specs = self.model_executor.get_kv_cache_specs()
获取 KV cache 规格:从执行器获取模型各层所需的 KV cache 规格(每层的 head 数、head 维度、数据类型等)。
has_kv_cache = any(kv_cache_spec for kv_cache_spec in kv_cache_specs)
if has_kv_cache:
if envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
assert self.available_gpu_memory_for_kv_cache > 0
available_gpu_memory = [self.available_gpu_memory_for_kv_cache] * len(
kv_cache_specs
)
else:
available_gpu_memory = self.model_executor.determine_available_memory()
self.available_gpu_memory_for_kv_cache = available_gpu_memory[0]
else:
available_gpu_memory = [0] * len(kv_cache_specs)
内存可用性判断三路分支:
| 场景 | 逻辑 |
|---|---|
| EEP 扩容 | 使用预先设置的 GPU 内存值(跳过 profiling) |
| 正常启动 | 执行 determine_available_memory()(运行 dummy forward pass 测量峰值内存) |
| 无 KV cache | 直接设为 0 |
max_model_len_before = vllm_config.model_config.max_model_len
kv_cache_configs = get_kv_cache_configs(
vllm_config, kv_cache_specs, available_gpu_memory
)
max_model_len_after = vllm_config.model_config.max_model_len
if max_model_len_after != max_model_len_before:
self.collective_rpc("update_max_model_len", args=(max_model_len_after,))
自动适配检测:get_kv_cache_configs 可能因为 GPU 内存不足而自动缩小 max_model_len。如果发生缩小,需要通过 collective_rpc 同步新值到所有 workers(因为 workers 在 memory profiling 之前就启动了,持有原始的较大值)。
scheduler_kv_cache_config = generate_scheduler_kv_cache_config(kv_cache_configs)
vllm_config.cache_config.num_gpu_blocks = scheduler_kv_cache_config.num_blocks
kv_cache_groups = scheduler_kv_cache_config.kv_cache_groups
if kv_cache_groups:
vllm_config.cache_config.block_size = min(
g.kv_cache_spec.block_size for g in kv_cache_groups
)
vllm_config.validate_block_size()
配置同步:将 KV cache 配置回写到 vllm_config,确保一致性。block_size 取所有 KV cache 组中最小的块大小。
self.model_executor.initialize_from_config(kv_cache_configs)
最终初始化:根据计算出的 KV cache 配置,在 GPU 上分配 KV cache 内存并执行 warmup。
3.2.3 add_request / abort_requests 方法(212-237行)
def add_request(self, request: Request, request_wave: int = 0):
if not isinstance(request.request_id, str):
raise TypeError(
f"request_id must be a string, got {type(request.request_id)}"
)
if pooling_params := request.pooling_params:
supported_pooling_tasks = [
task for task in self.get_supported_tasks() if task in POOLING_TASKS
]
if pooling_params.task not in supported_pooling_tasks:
raise ValueError(...)
if request.kv_transfer_params is not None and (
not self.scheduler.get_kv_connector()
):
logger.warning(
"Got kv_transfer_params, but no KVConnector found. "
"Disabling KVTransfer for this request."
)
self.scheduler.add_request(request)
请求添加逻辑:
- 类型校验:
request_id必须是字符串(ZMQ 传输要求) - 任务校验:pooling 请求必须支持对应任务类型
- KV 传输降级:如果请求要求 KV 传输但无连接器,发出警告但不拒绝
- 最终:委托给调度器的
add_request
def abort_requests(self, request_ids: list[str]):
self.scheduler.finish_requests(request_ids, RequestStatus.FINISHED_ABORTED)
中止请求:直接标记为 ABORTED 完成,调度器会在下次 update 时处理。
3.2.4 log_error_detail / log_iteration_details 上下文管理器(239-272行)
@contextmanager
def log_error_detail(self, scheduler_output: SchedulerOutput):
try:
yield
except Exception as err:
dump_engine_exception(
self.vllm_config, scheduler_output, self.scheduler.make_stats()
)
raise err
错误详情转储:模型执行异常时,将调度器输入转储到日志,帮助调试。dump_engine_exception 是异常安全的(不会抛出新异常)。
@contextmanager
def log_iteration_details(self, scheduler_output: SchedulerOutput):
if not self.vllm_config.observability_config.enable_logging_iteration_details:
yield
return
...
iteration_details = compute_iteration_details(scheduler_output)
before = time.monotonic()
yield
logger.info(
"Iteration(N): M context requests, N context tokens, "
"K generation requests, L generation tokens, elapsed: X ms"
)
self._iteration_index += 1
迭代详情日志:如果启用了,在每次 step 前后测量时间并记录请求/令牌数量。注意 yield 在中间,先记录时间,执行后再输出。
3.2.5 step 方法(274-298行) — 核心主循环
def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
"""Schedule, execute, and make output.
Returns tuple of outputs and a flag indicating whether the model
was executed.
"""
返回值:(outputs_dict, model_executed)
outputs_dict:{client_index: EngineCoreOutputs}映射model_executed:本次 step 是否实际执行了模型(可能没有可调度请求)
if not self.scheduler.has_requests():
return {}, False
快速路径:如果调度器没有任何请求(包括正在运行的),直接返回空结果。
scheduler_output = self.scheduler.schedule()
future = self.model_executor.execute_model(scheduler_output, non_block=True)
grammar_output = self.scheduler.get_grammar_bitmask(scheduler_output)
三步核心操作:
schedule():调度器决定哪些请求在本次 step 执行,分配 KV cache 块execute_model(non_block=True):异步执行模型前向传播,返回 Futureget_grammar_bitmask():获取结构化输出的语法位掩码(JSON schema 约束)
注意 non_block=True:执行器异步启动,不阻塞调度循环。
with (
self.log_error_detail(scheduler_output),
self.log_iteration_details(scheduler_output),
):
model_output = future.result()
if model_output is None:
model_output = self.model_executor.sample_tokens(grammar_output)
等待模型输出 + 采样:
future.result():阻塞等待模型前向传播完成- 如果
model_output is None:模型执行只做了前向传播,需要单独采样 token- 某些执行器(如 GPU)将前向传播和采样分开
- 如果
model_output is not None:执行器已经同时完成了前向传播和采样
self._process_aborts_queue()
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output
)
return engine_core_outputs, scheduler_output.total_num_scheduled_tokens > 0
后处理:
- 处理 abort 队列(在模型执行期间收到的中止请求)
update_from_output():调度器根据模型输出更新内部状态——推进 token、检测停止条件、释放 KV cache 块- 返回输出和是否执行了模型
3.2.6 post_step 方法(300-307行)
def post_step(self, model_executed: bool) -> None:
if not self.async_scheduling and self.use_spec_decode and model_executed:
draft_token_ids = self.model_executor.take_draft_token_ids()
if draft_token_ids is not None:
self.scheduler.update_draft_token_ids(draft_token_ids)
后步处理 — 仅在同步调度 + 投机解码 + 模型执行时:
- 从执行器取走 draft token IDs
- 更新调度器中的 draft token 信息(用于下次调度的验证/接受)
异步调度模式下,draft tokens 在 worker 进程中直接更新,无需主循环介入。
3.2.7 step_with_batch_queue 方法(309-381行) — Pipeline Parallel 核心循环
def step_with_batch_queue(
self,
) -> tuple[dict[int, EngineCoreOutputs] | None, bool]:
关键区别:
step():同步,一步到位step_with_batch_queue():异步,批次流水线化,可能返回None(新调度入队但无输出就绪)
伪代码:
step_with_batch_queue():
1. 如果调度器有请求且队列未满:
a. 调度 → 异步执行 → [可选] 异步采样
b. 将 (future, scheduler_output, exec_future) 加入队列左端
c. 如果模型执行了且队列未满且最老批次未完成 → return None (继续调度)
2. 如果调度器无请求且队列为空 → return (None, False)
3. 从队列右端弹出最老批次,等待其完成
4. 处理 aborts,更新调度器输出
5. 处理延迟采样(结构化输出 + 投机解码的交互)
6. return (outputs, model_executed)
详细分析:
model_executed = False
deferred_scheduler_output = None
if self.scheduler.has_requests():
scheduler_output = self.scheduler.schedule()
with self.log_error_detail(scheduler_output):
exec_future = self.model_executor.execute_model(
scheduler_output, non_block=True
)
if self.is_ec_consumer:
model_executed = scheduler_output.total_num_scheduled_tokens > 0
EC consumer 判断:在 disaggregated prefill 模式中,consumer 端不执行模型,但可能接收远程 KV cache。model_executed 的判断方式不同。
if self.is_pooling_model or not model_executed:
future = cast(Future[ModelRunnerOutput], exec_future)
else:
if not scheduler_output.pending_structured_output_tokens:
grammar_output = self.scheduler.get_grammar_bitmask(
scheduler_output
)
future = self.model_executor.sample_tokens(
grammar_output, non_block=True
)
else:
deferred_scheduler_output = scheduler_output
采样决策三路分支:
| 条件 | 操作 |
|---|---|
| 池化模型 or 无执行 | 直接使用 exec_future |
| 有执行 + 无延迟结构化输出 | 立即采样 |
| 有执行 + 有延迟结构化输出 | 延迟采样(deferred_scheduler_output) |
延迟采样的原因:投机解码 + 结构化输出时,需要先验证 draft tokens 才能计算语法位掩码。当前步的 draft tokens 来自上一步的输出,所以需要等上一步的 model_output 可用后才能处理。
if not deferred_scheduler_output:
batch_queue.appendleft((future, scheduler_output, exec_future))
if (
model_executed
and len(batch_queue) < self.batch_queue_size
and not batch_queue[-1][0].done()
):
return None, True
优先填满队列:如果新批次已入队、队列未满、最老批次未完成,则不阻塞等待输出,继续调度新批次。return None 表示本次调用无输出。
future, scheduler_output, exec_model_fut = batch_queue.pop()
with (
self.log_error_detail(scheduler_output),
self.log_iteration_details(scheduler_output),
):
model_output = future.result()
if model_output is None:
exec_model_fut.result()
raise RuntimeError("unexpected error")
等待最老批次完成:batch_queue.pop() 从右端(最老)弹出。如果 sample 返回 None,说明原始 execute_model 调用失败了,重新 raise 其异常。
if deferred_scheduler_output:
if self.use_spec_decode:
draft_token_ids = self.model_executor.take_draft_token_ids()
assert draft_token_ids is not None
self.scheduler.update_draft_token_ids_in_output(
draft_token_ids, deferred_scheduler_output
)
grammar_output = self.scheduler.get_grammar_bitmask(
deferred_scheduler_output
)
future = self.model_executor.sample_tokens(grammar_output, non_block=True)
batch_queue.appendleft((future, deferred_scheduler_output, exec_future))
延迟采样处理:
- 如果使用投机解码,先获取 draft token IDs 并更新延迟调度器输出
- 然后计算语法位掩码
- 最后异步采样并将新批次入队
3.2.8 辅助方法(383-430行)
def _process_aborts_queue(self):
if not self.aborts_queue.empty():
request_ids = []
while not self.aborts_queue.empty():
ids = self.aborts_queue.get_nowait()
request_ids.extend((ids,) if isinstance(ids, str) else ids)
self.abort_requests(request_ids)
批量处理 abort:将队列中所有 abort 合并为一次调用,比逐个处理更高效。
def shutdown(self):
self.structured_output_manager.clear_backend()
if self.model_executor:
self.model_executor.shutdown()
if self.scheduler:
self.scheduler.shutdown()
gc.unfreeze()
关闭顺序:结构化输出 → 执行器 → 调度器 → 解冻 GC。gc.unfreeze() 是关键——让启动时冻结的对象重新可被 GC 回收,避免进程内删除引擎时 GPU 内存泄漏。
def profile(self, is_start: bool = True, profile_prefix: str | None = None):
self.model_executor.profile(is_start, profile_prefix)
性能分析:委托给执行器,支持 start/stop 控制。
3.2.9 缓存重置方法(440-465行)
def reset_mm_cache(self):
if self.scheduler.has_unfinished_requests():
logger.warning(...)
if self.mm_receiver_cache is not None:
self.mm_receiver_cache.clear_cache()
self.model_executor.reset_mm_cache()
def reset_prefix_cache(self, reset_running_requests=False, reset_connector=False) -> bool:
return self.scheduler.reset_prefix_cache(reset_running_requests, reset_connector)
def reset_encoder_cache(self) -> None:
if self.scheduler.has_unfinished_requests():
logger.warning(...)
self.scheduler.reset_encoder_cache()
self.model_executor.reset_encoder_cache()
三级缓存重置:
| 缓存 | 逻辑层 | 物理层 | 作用 |
|---|---|---|---|
| MM cache | receiver_cache | executor | 多模态特征 |
| Prefix cache | scheduler | - | KV cache 前缀共享 |
| Encoder cache | scheduler | executor | 编码器输出 |
所有重置方法在有未完成请求时发出警告(可能导致缓存不同步)。
def _reset_caches(self, reset_running_requests=True) -> None:
self.reset_prefix_cache(reset_running_requests=reset_running_requests)
self.reset_mm_cache()
self.reset_encoder_cache()
统一重置:在 sleep/pause 时调用,清空所有缓存。
3.2.10 暂停/恢复/睡眠方法(467-531行)
def pause_scheduler(
self, mode: PauseMode = "abort", clear_cache: bool = True
) -> Future | None:
if mode not in ("keep", "abort", "wait"):
raise ValueError(f"Invalid pause mode: {mode}")
if mode == "wait":
raise ValueError("'wait' mode can't be used in inproc-engine mode")
if mode == "abort":
self.scheduler.finish_requests(None, RequestStatus.FINISHED_ABORTED)
pause_state = PauseState.PAUSED_ALL if mode == "keep" else PauseState.PAUSED_NEW
self.scheduler.set_pause_state(pause_state)
if clear_cache:
self._reset_caches()
return None
EngineCore 的 pause_scheduler — 进程内模式,不支持 “wait”:
| 模式 | 行为 |
|---|---|
"abort" |
中止所有请求 + PAUSED_NEW + 清缓存 |
"wait" |
❌ 不支持(进程内无法等待在途请求排空) |
"keep" |
PAUSED_ALL + 清缓存(保持请求在队列中) |
def sleep(self, level: int = 1, mode: PauseMode = "abort") -> None | Future:
睡眠层级:
| Level | 效果 |
|---|---|
| 0 | 仅暂停调度,GPU 内存不变 |
| 1 | 卸载模型权重到 CPU,丢弃 KV cache |
| 2 | 丢弃所有 GPU 内存 |
pause_future = self.pause_scheduler(mode=mode, clear_cache=level >= 1)
if level < 1:
return pause_future
model_executor = self.model_executor
if pause_future is None:
model_executor.sleep(level)
return None
future = Future[Any]()
def pause_complete(f: Future):
try:
f.result()
future.set_result(model_executor.sleep(level))
except Exception as e:
future.set_exception(e)
pause_future.add_done_callback(pause_complete)
return future
睡眠流程:先暂停调度,再通知执行器处理 GPU 内存。如果 pause_future 不为 None(等待模式),先等暂停完成再睡眠。
def wake_up(self, tags: list[str] | None = None):
if tags is not None and "scheduling" in tags:
tags = [t for t in tags if t != "scheduling"]
if tags is None or tags:
self.model_executor.wake_up(tags)
self.resume_scheduler()
唤醒流程:先恢复 GPU 内存(如果需要),再恢复调度。tags=["scheduling"] 仅恢复调度不恢复 GPU 内存(level 0 唤醒)。
3.2.11 LoRA / collective_rpc / preprocess 方法(533-575行)
def add_lora(self, lora_request: LoRARequest) -> bool:
return self.model_executor.add_lora(lora_request)
def remove_lora(self, lora_id: int) -> bool:
return self.model_executor.remove_lora(lora_id)
def list_loras(self) -> set[int]:
return self.model_executor.list_loras()
def pin_lora(self, lora_id: int) -> bool:
return self.model_executor.pin_lora(lora_id)
LoRA 代理方法:全部委托给执行器。LoRA 是运行时动态加载的模型适配器。
def collective_rpc(
self,
method: str | Callable[..., _R],
timeout: float | None = None,
args: tuple = (),
kwargs: dict[str, Any] | None = None,
) -> list[_R]:
return self.model_executor.collective_rpc(method, timeout, args, kwargs)
collective_rpc:对所有 worker 执行同一方法,返回结果列表。用于广播式操作(如 update_max_model_len)。
def preprocess_add_request(self, request: EngineCoreRequest) -> tuple[Request, int]:
if self.mm_receiver_cache is not None and request.mm_features:
request.mm_features = self.mm_receiver_cache.get_and_update_features(
request.mm_features
)
req = Request.from_engine_core_request(request, self.request_block_hasher)
if req.use_structured_output:
self.structured_output_manager.grammar_init(req)
return req, request.current_wave
请求预处理 — 在输入处理线程中并行执行:
- 多模态特征缓存:从接收器缓存获取/更新多模态特征(线程安全:仅在此线程访问)
- 转换为内部 Request:
Request.from_engine_core_request将 EngineCoreRequest 转为调度器使用的 Request 对象 - 结构化输出初始化:如果请求需要约束输出,初始化语法编译器(异步编译,调度器会检查编译状态)
3.2.12 EEP 方法(577-583行)
def _eep_scale_up_before_kv_init(self):
raise NotImplementedError
def _eep_send_engine_core_notification(
self,
notification_type: EEPNotificationType,
vllm_config: VllmConfig | None = None,
):
raise NotImplementedError
基类占位方法:在 DPEngineCoreProc 中覆写。
3.3 EngineShutdownState 枚举(585-588行)
class EngineShutdownState(IntEnum):
RUNNING = 0
REQUESTED = 1
SHUTTING_DOWN = 2
关闭状态机:
RUNNING ──(SIGTERM/SIGINT)──→ REQUESTED ──(处理中)──→ SHUTTING_DOWN ──(完成)──→ 退出
| 状态 | 含义 |
|---|---|
RUNNING=0 |
正常运行 |
REQUESTED=1 |
收到关闭信号,准备处理 |
SHUTTING_DOWN=2 |
正在排空/中止请求 |
3.4 EngineCoreProc 类 — 子进程引擎
3.4.1 __init__ 方法(591-700行)
伪代码:
EngineCoreProc.__init__(vllm_config, local_client, handshake_address, ...):
1. 创建 input_queue, output_queue
2. 设置 executor_fail_callback(→ input_queue.put EXECUTOR_FAILED)
3. 设置 engine_index, identity
4. [可选] 初始化 TensorIpcReceiver
5. 执行 ZMQ 握手 → 获取地址
6. 配置数据并行环境
7. [EEP] 发送初始化就绪通知
8. 调用 super().__init__()(EngineCore 初始化)
9. 启动输入/输出 socket 线程
10. 等待 DP coordinator 就绪
class EngineCoreProc(EngineCore):
ENGINE_CORE_DEAD = b"ENGINE_CORE_DEAD"
addresses: EngineZmqAddresses
ENGINE_CORE_DEAD:引擎死亡信号字节,输出线程检测到后关闭addresses:ZMQ 地址配置类属性
@instrument(span_name="EngineCoreProc init")
def __init__(
self,
vllm_config: VllmConfig,
local_client: bool,
handshake_address: str,
executor_class: type[Executor],
log_stats: bool,
client_handshake_address: str | None = None,
tensor_queue: Queue | None = None,
*,
engine_index: int = 0,
):
| 参数 | 含义 |
|---|---|
local_client |
前端是否在本机 |
handshake_address |
主握手 ZMQ 地址 |
client_handshake_address |
本地前端握手地址(DP>1 + hybrid LB 时) |
tensor_queue |
多进程张量共享队列 |
engine_index |
引擎索引(DP 场景) |
self.input_queue = queue.Queue[tuple[EngineCoreRequestType, Any]]()
self.output_queue = queue.Queue[tuple[int, EngineCoreOutputs] | bytes]()
executor_fail_callback = lambda: self.input_queue.put_nowait(
(EngineCoreRequestType.EXECUTOR_FAILED, b"")
)
双队列架构:
input_queue:(request_type, request_data)元组队列output_queue:(client_index, EngineCoreOutputs)或ENGINE_CORE_DEAD字节
executor 失败时通过 input_queue 通知主循环。
self.engine_index = engine_index
identity = self.engine_index.to_bytes(length=2, byteorder="little")
self.engines_running = False
self.shutdown_state = EngineShutdownState.RUNNING
ZMQ identity:2 字节的小端编码,用于 DEALER socket 标识。
with self._perform_handshakes(
handshake_address, identity, local_client, vllm_config,
client_handshake_address,
) as addresses:
self.has_coordinator = addresses.coordinator_output is not None
self.frontend_stats_publish_address = (
addresses.frontend_stats_publish_address
)
internal_dp_balancing = (
self.has_coordinator
and not vllm_config.parallel_config.data_parallel_external_lb
)
self.publish_dp_lb_stats = internal_dp_balancing
self.addresses = addresses
self.process_input_queue_block = True
握手结果处理:
has_coordinator:是否有 DP 协调器publish_dp_lb_stats:是否向协调器发布负载统计process_input_queue_block:input_queue.get() 是否阻塞(EEP 时设为 False)
if envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
self._eep_send_engine_core_notification(
EEPNotificationType.NEW_CORE_ENGINES_INIT_READY,
vllm_config=vllm_config,
)
self._init_data_parallel(vllm_config)
EEP 通知 + DP 初始化:在 super().init() 之前完成。
super().__init__(
vllm_config,
executor_class,
log_stats,
executor_fail_callback,
internal_dp_balancing,
)
调用父类初始化:internal_dp_balancing 作为 include_finished_set 传入——内部 DP 负载均衡需要跟踪已完成的请求集合。
ready_event = threading.Event()
input_thread = threading.Thread(
target=self.process_input_sockets,
args=(addresses.inputs, addresses.coordinator_input, identity, ready_event),
daemon=True,
)
input_thread.start()
self.output_thread = threading.Thread(
target=self.process_output_sockets,
args=(addresses.outputs, addresses.coordinator_output, self.engine_index),
daemon=True,
)
self.output_thread.start()
while not ready_event.wait(timeout=10):
if not input_thread.is_alive():
raise RuntimeError("Input socket thread died during startup")
assert addresses.coordinator_input is not None
logger.info("Waiting for READY message from DP Coordinator...")
I/O 线程启动:
- 输入线程:ZMQ socket → input_queue
- 输出线程:output_queue → ZMQ socket
- 等待 ready_event:输入线程准备好后设置,包括 DP coordinator 的 READY 消息
3.4.2 握手机制(702-780行)
@contextmanager
def _perform_handshakes(
self, handshake_address, identity, local_client, vllm_config,
client_handshake_address,
) -> Generator[EngineZmqAddresses, None, None]:
input_ctx = zmq.Context()
is_local = local_client and client_handshake_address is None
headless = not local_client
handshake = self._perform_handshake(
input_ctx, handshake_address, identity, is_local, headless,
vllm_config, vllm_config.parallel_config,
)
if client_handshake_address is None:
with handshake as addresses:
yield addresses
else:
# 双握手:rank 0 + 本地前端
local_handshake = self._perform_handshake(
input_ctx, client_handshake_address, identity, True, False, vllm_config
)
with handshake as addresses, local_handshake as client_addresses:
addresses.inputs = client_addresses.inputs
addresses.outputs = client_addresses.outputs
yield addresses
vllm_config.__post_init__()
握手场景:
| 场景 | 握手次数 | 说明 |
|---|---|---|
| DP=1 / 离线 | 1次 | 与共置前端握手 |
| DP>1 + 内部 LB | 1次 | 与共享前端握手 |
| DP>1 + 外部/混合 LB | 2次 | rank0前端 + 本地前端 |
第二次握手将本地前端的输入/输出地址合并到主地址中。
@contextmanager
def _perform_handshake(
self, ctx, handshake_address, identity, local_client, headless,
vllm_config, parallel_config_to_update=None,
) -> Generator[EngineZmqAddresses, None, None]:
with make_zmq_socket(
ctx, handshake_address, zmq.DEALER, identity=identity,
linger=5000, bind=False,
) as handshake_socket:
addresses = self.startup_handshake(
handshake_socket, local_client, headless, parallel_config_to_update
)
yield addresses
ready_msg = {
"status": "READY",
"local": local_client,
"headless": headless,
}
if vllm_config.parallel_config.data_parallel_size > 1:
ready_msg["parallel_config_hash"] = (
vllm_config.parallel_config.compute_hash()
)
handshake_socket.send(msgspec.msgpack.encode(ready_msg))
握手流程:
Engine → Frontend: {"status": "HELLO", "local": ..., "headless": ...}
Frontend → Engine: EngineHandshakeMetadata (ZMQ addresses + parallel config)
... (引擎初始化) ...
Engine → Frontend: {"status": "READY", "local": ..., "headless": ..., "parallel_config_hash": ...}
DP>1 时包含 parallel_config_hash 用于配置一致性验证。
@staticmethod
def startup_handshake(
handshake_socket, local_client, headless, parallel_config=None,
) -> EngineZmqAddresses:
handshake_socket.send(
msgspec.msgpack.encode({"status": "HELLO", "local": local_client, "headless": headless})
)
if not handshake_socket.poll(timeout=HANDSHAKE_TIMEOUT_MINS * 60_000):
raise RuntimeError(...)
init_bytes = handshake_socket.recv()
init_message: EngineHandshakeMetadata = msgspec.msgpack.decode(
init_bytes, type=EngineHandshakeMetadata
)
if parallel_config is not None:
for key, value in init_message.parallel_config.items():
setattr(parallel_config, key, value)
return init_message.addresses
握手详情:HELLO → 等待5分钟 → 接收初始化消息 → 更新 parallel_config → 返回地址。
3.4.3 run_engine_core 静态方法(782-830行)
@staticmethod
def run_engine_core(*args, dp_rank: int = 0, local_dp_rank: int = 0, **kwargs):
"""Launch EngineCore busy loop in background process."""
maybe_register_config_serialize_by_value()
engine_core: EngineCoreProc | None = None
signal_callback: SignalCallback | None = None
try:
vllm_config = kwargs["vllm_config"]
parallel_config = vllm_config.parallel_config
data_parallel = parallel_config.data_parallel_size > 1 or dp_rank > 0
入口方法 — 在子进程中被调用:
maybe_register_config_serialize_by_value():确保 transformer config 在 spawn 后可序列化- 根据 DP 配置选择进程标题和引擎类型
if data_parallel and vllm_config.model_config.is_moe:
parallel_config.data_parallel_rank = dp_rank
engine_core = DPEngineCoreProc(*args, **kwargs)
else:
parallel_config.data_parallel_size = 1
parallel_config.data_parallel_size_local = 1
parallel_config.data_parallel_rank = 0
engine_core = EngineCoreProc(*args, engine_index=dp_rank, **kwargs)
DP 引擎选择逻辑:
| 条件 | 引擎类型 | 说明 |
|---|---|---|
| DP>1 + MoE | DPEngineCoreProc |
需要 DP all-reduce 同步 |
| DP>1 + 非MoE | EngineCoreProc(engine_index=dp_rank) |
完全独立,当 DP=1 处理 |
| DP=1 | EngineCoreProc(engine_index=0) |
单引擎 |
关键洞察:非 MoE 模型的 DP rank 完全独立运行(无需同步),所以将 data_parallel_size 重置为 1。
def wakeup_engine():
engine_core.input_queue.put_nowait((EngineCoreRequestType.WAKEUP, None))
signal_callback = SignalCallback(wakeup_engine)
def signal_handler(signum, frame):
engine_core.shutdown_state = EngineShutdownState.REQUESTED
signal_callback.trigger()
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
engine_core.run_busy_loop()
信号处理:
- SIGTERM/SIGINT → 设置
REQUESTED状态 + 唤醒空闲引擎 SignalCallback确保在主线程而非信号处理器中执行(安全)- 进入 busy loop
except Exception as e:
if engine_core is None:
logger.exception("EngineCore failed to start.")
else:
logger.exception("EngineCore encountered a fatal error.")
engine_core._send_engine_dead()
raise e
finally:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
if signal_callback is not None:
signal_callback.stop()
if engine_core is not None:
engine_core.shutdown()
异常处理:启动失败 vs 运行时错误的区分。运行时错误发送 ENGINE_CORE_DEAD 信号。finally 块恢复信号处理 + 关闭引擎。
3.4.4 run_busy_loop / _process_input_queue / _process_engine_step(840-897行)
def has_work(self) -> bool:
return (
self.engines_running
or self.scheduler.has_requests()
or bool(self.batch_queue)
)
def is_running(self) -> bool:
return self.shutdown_state == EngineShutdownState.RUNNING
def run_busy_loop(self):
while self._handle_shutdown():
self._process_input_queue()
self._process_engine_step()
raise SystemExit
忙循环主逻辑:
while _handle_shutdown():
_process_input_queue() # 处理输入直到有工作
_process_engine_step() # 执行一步并输出
SystemExit
def _process_input_queue(self):
waited = False
while not self.has_work() and self.is_running():
self._notify_idle_state_callbacks()
if self.input_queue.empty():
with self.aborts_queue.mutex:
self.aborts_queue.queue.clear()
if logger.isEnabledFor(DEBUG):
logger.debug("EngineCore waiting for work.")
waited = True
block = self.process_input_queue_block
try:
req = self.input_queue.get(block=block)
self._handle_client_request(*req)
except queue.Empty:
break
if not block:
break
if waited:
logger.debug("EngineCore loop active.")
while not self.input_queue.empty():
req = self.input_queue.get_nowait()
self._handle_client_request(*req)
输入处理逻辑:
- 等待工作:循环检查
has_work(),空闲时通知 idle 回调 - 清空 abort 队列:空闲时清空(因为没有请求在运行,abort 无意义)
- 阻塞/非阻塞读取:
process_input_queue_block控制是否阻塞等待 - 处理剩余请求:在退出等待循环后,一次性处理完队列中剩余的请求
def _process_engine_step(self) -> bool:
outputs, model_executed = self.step_fn()
for output in outputs.items() if outputs else ():
self.output_queue.put_nowait(output)
self.post_step(model_executed)
if not model_executed and self.scheduler.has_unfinished_requests():
time.sleep(0.001)
return model_executed
引擎步进:
- 执行 step(调度+执行+更新)
- 将输出放入 output_queue(输出线程会转发到 ZMQ)
- 后步处理
- 关键优化:如果模型未执行但还有未完成请求(如等待远程 KV),sleep 1ms 让出 GIL,让后台线程(如 NIXL 握手)得以执行
3.4.5 _handle_shutdown / _handle_client_request(899-970行)
def _handle_shutdown(self) -> bool:
if self.shutdown_state == EngineShutdownState.RUNNING:
return True
if self.shutdown_state == EngineShutdownState.REQUESTED:
shutdown_timeout = self.vllm_config.shutdown_timeout
if shutdown_timeout == 0:
aborted_reqs = self.scheduler.finish_requests(
None, RequestStatus.FINISHED_ABORTED
)
self._send_abort_outputs(aborted_reqs)
else:
logger.info("Draining %d in-flight requests (timeout=%ds)", ...)
self.shutdown_state = EngineShutdownState.SHUTTING_DOWN
if not self.has_work():
logger.info("Shutdown complete")
return False
return True
关闭处理状态机:
RUNNING → return True (继续循环)
REQUESTED → timeout=0: 立即中止所有请求; timeout>0: 排空请求 → SHUTTING_DOWN
SHUTTING_DOWN → has_work()? True : False (退出)
def _handle_client_request(self, request_type, request):
if request_type == EngineCoreRequestType.WAKEUP:
return
elif request_type == EngineCoreRequestType.ADD:
req, request_wave = request
if self._reject_add_in_shutdown(req):
return
self.add_request(req, request_wave)
elif request_type == EngineCoreRequestType.ABORT:
self.abort_requests(request)
elif request_type == EngineCoreRequestType.UTILITY:
client_idx, call_id, method_name, args = request
if self._reject_utility_in_shutdown(client_idx, call_id, method_name):
return
output = UtilityOutput(call_id)
get_result = lambda: (
(method := getattr(self, method_name))
and method(*self._convert_msgspec_args(method, args))
)
enqueue_output = lambda out: self.output_queue.put_nowait(
(client_idx, EngineCoreOutputs(utility_output=out))
)
self._invoke_utility_method(method_name, get_result, output, enqueue_output)
elif request_type == EngineCoreRequestType.EXECUTOR_FAILED:
raise RuntimeError("Executor failed.")
请求分发逻辑:
| 类型 | 处理 |
|---|---|
| WAKEUP | 忽略(仅用于唤醒空闲引擎) |
| ADD | 预处理 → add_request |
| ABORT | abort_requests |
| UTILITY | 反射调用方法 → 封装结果 |
| EXECUTOR_FAILED | 抛出 RuntimeError |
Utility RPC 机制:通过 getattr(self, method_name) 动态查找方法,支持任意引擎方法的远程调用。
3.4.6 _invoke_utility_method / _convert_msgspec_args(972-1003行)
@staticmethod
def _invoke_utility_method(name, get_result, output, enqueue_output):
try:
result = get_result()
if isinstance(result, Future):
callback = lambda future: EngineCoreProc._invoke_utility_method(
name, future.result, output, enqueue_output
)
result.add_done_callback(callback)
return
output.result = UtilityResult(result)
except Exception as e:
output.failure_message = f"Call to {name} method failed: {str(e)}"
enqueue_output(output)
递归 Future 处理:如果 utility 方法返回 Future,递归等待结果。支持异步方法调用。
@staticmethod
def _convert_msgspec_args(method, args):
if not args:
return args
arg_types = signature(method).parameters.values()
assert len(args) <= len(arg_types)
return tuple(
msgspec.convert(v, type=p.annotation)
if isclass(p.annotation)
and issubclass(p.annotation, msgspec.Struct)
and not isinstance(v, p.annotation)
else v
for v, p in zip(args, arg_types)
)
自动 msgspec 转换:通过运行时类型检查,将普通 dict 自动转换为 msgspec.Struct 实例。这是必要的因为 msgpack 反序列化默认产生 dict,但方法签名期望 Struct。
3.4.7 _send_engine_dead(1005-1017行)
def _send_engine_dead(self):
self.output_queue.put_nowait(EngineCoreProc.ENGINE_CORE_DEAD)
self.output_thread.join(timeout=5.0)
if self.output_thread.is_alive():
logger.fatal(
"vLLM shutdown signal from EngineCore failed to send."
)
死亡信号:发送 ENGINE_CORE_DEAD 并等待输出线程确认发送。5秒超时后如果线程仍活着,说明消息发送失败(可能是 ZMQ 连接断开)。
3.4.8 process_input_sockets — 输入线程(1019-1096行)
伪代码:
process_input_sockets(input_addresses, coord_input_address, identity, ready_event):
1. 创建 add_request_decoder (带 Tensor IPC) 和 generic_decoder
2. 连接所有 input DEALER socket + coordinator XSUB socket
3. 向每个 input socket 发送 ready_payload (EngineCoreReadyResponse)
4. [DP] 等待 coordinator READY 消息
5. 设置 ready_event
6. 循环:
a. ZMQ poll 等待消息
b. 接收多帧消息: (type_frame, *data_frames)
c. 按 request_type 反序列化
d. [ADD] preprocess_add_request → input_queue
e. [ABORT] 同时放入 aborts_queue 和 input_queue
f. [其他] 反序列化 → input_queue
关键细节:
add_request_decoder = MsgpackDecoder(
EngineCoreRequest, oob_tensor_provider=self.tensor_ipc_receiver
)
generic_decoder = MsgpackDecoder(oob_tensor_provider=self.tensor_ipc_receiver)
带外张量提供器:oob_tensor_provider 允许通过单独的进程间队列传输张量数据(而非序列化进 msgpack),实现零拷贝张量共享。
ready_response = EngineCoreReadyResponse(
max_model_len=self.vllm_config.model_config.max_model_len,
num_gpu_blocks=self.vllm_config.cache_config.num_gpu_blocks or 0,
dp_stats_address=self.frontend_stats_publish_address,
)
ready_payload = msgspec.msgpack.encode(ready_response)
for input_socket in input_sockets:
input_socket.send(ready_payload)
首次消息:向每个 input socket 发送 EngineCoreReadyResponse。这是 ZMQ ROUTER-DEALER 模式的要求——DEALER 必须先发消息,ROUTER 才能向其发送。
if request_type == EngineCoreRequestType.ABORT:
self.aborts_queue.put_nowait(request)
Abort 双入队:abort 同时放入 aborts_queue(让核心循环在 step 间隙尽快处理)和 input_queue(确保排序,避免请求泄漏)。abort 在调度器中是幂等的,所以双入队安全。
3.4.9 process_output_sockets — 输出线程(1098-1159行)
伪代码:
process_output_sockets(output_paths, coord_output_path, engine_index):
1. 创建 MsgpackEncoder + 复用缓冲区列表
2. 连接 PUSH socket(s) + coordinator PUSH socket
3. 循环:
a. 从 output_queue 获取输出
b. [ENGINE_CORE_DEAD] 发送到所有 socket 并退出
c. 设置 engine_index
d. [client_index == -1] 发送到 coordinator
e. [其他] 编码 + 零拷贝发送到对应 client socket
f. 缓冲区复用管理
零拷贝发送优化:
reuse_buffers: list[bytearray] = []
pending = deque[tuple[zmq.MessageTracker, Any, bytearray]]()
reuse_buffers:已发送完成的缓冲区,可复用pending:正在被 ZMQ 传输的缓冲区引用,防止过早释放
buffer = reuse_buffers.pop() if reuse_buffers else bytearray()
buffers = encoder.encode_into(outputs, buffer)
tracker = sockets[client_index].send_multipart(
buffers, copy=False, track=True
)
encode_into 直接编码到预分配缓冲区,copy=False + track=True 实现零拷贝发送。MessageTracker 用于跟踪 ZMQ 何时完成发送,之后缓冲区可复用。
3.4.10 pause_scheduler(进程模式)(1161-1195行)
def pause_scheduler(self, mode="abort", clear_cache=True) -> Future | None:
def engine_idle_callback(engine, future):
if clear_cache:
engine._reset_caches()
future.set_result(None)
if mode == "abort":
aborted_reqs = self.scheduler.finish_requests(
None, RequestStatus.FINISHED_ABORTED
)
self._send_abort_outputs(aborted_reqs)
pause_state = PauseState.PAUSED_ALL if mode == "keep" else PauseState.PAUSED_NEW
self.scheduler.set_pause_state(pause_state)
if self._pause_complete():
if clear_cache:
self._reset_caches()
return None
future = Future[Any]()
self._idle_state_callbacks.append(partial(engine_idle_callback, future=future))
return future
与 EngineCore.pause_scheduler 的区别:
| 特性 | EngineCore | EngineCoreProc |
|---|---|---|
| “wait” 模式 | ❌ 不支持 | ✅ 支持 |
| “abort” 时发送中止输出 | ❌ | ✅ _send_abort_outputs |
| 异步 Future | ❌ 总是返回 None | ✅ 返回 Future(等空闲时完成) |
“wait” 模式的实现:设置 PAUSED_NEW(新请求排队但继续 step),当引擎空闲时触发 idle 回调,清缓存并完成 Future。
3.4.11 输出发送辅助方法(1197-1230行)
def _send_finish_outputs_to_client(
self, req_ids, client_index, finish_reason
) -> None:
outputs = [
EngineCoreOutput(req_id, [], finish_reason=finish_reason)
for req_id in req_ids
]
eco = EngineCoreOutputs(finished_requests=req_ids, outputs=outputs)
self.output_queue.put_nowait((client_index, eco))
def _send_abort_outputs_to_client(self, req_ids, client_index):
self._send_finish_outputs_to_client(req_ids, client_index, FinishReason.ABORT)
def _send_error_outputs_to_client(self, req_ids, client_index):
self._send_finish_outputs_to_client(req_ids, client_index, FinishReason.ERROR)
def _send_abort_outputs(self, aborted_reqs):
if aborted_reqs:
by_client = defaultdict[int, set[str]](set)
for req_id, client_index in aborted_reqs:
by_client[client_index].add(req_id)
for client_index, req_ids in by_client.items():
self._send_abort_outputs_to_client(list(req_ids), client_index)
客户端路由输出:
by_client:将请求按 client_index 分组- 每个客户端独立发送,确保输出路由到正确的前端
- 同一请求可能属于不同客户端(多前端场景)
3.5 DPEngineCoreProc 类 — 数据并行引擎(MoE)
3.5.1 __init__ 方法(1233-1270行)
class DPEngineCoreProc(EngineCoreProc):
def __init__(self, vllm_config, local_client, handshake_address, ...):
assert vllm_config.model_config.is_moe, (
"DPEngineCoreProc should only be used for MoE models"
)
self.step_counter = 0
self.current_wave = 0
self.last_counts = (0, 0)
self.pending_pause = False
self.ignore_start_dp_wave = False
self.eep_scaling_state: ElasticEPScalingState | None = None
MoE 专用状态:
| 字段 | 含义 |
|---|---|
step_counter |
前向传播计数器(每32步同步一次) |
current_wave |
当前 DP wave 编号 |
last_counts |
上次发布的请求计数(避免重复发送) |
pending_pause |
是否有待处理的暂停 |
ignore_start_dp_wave |
是否忽略 START_DP_WAVE 消息(暂停共识后) |
eep_scaling_state |
弹性 EP 扩缩容状态机 |
3.5.2 _init_data_parallel 方法(1272-1285行)
def _init_data_parallel(self, vllm_config: VllmConfig):
parallel_config = vllm_config.parallel_config
dp_rank = parallel_config.data_parallel_rank
dp_size = parallel_config.data_parallel_size
local_dp_rank = parallel_config.data_parallel_rank_local
assert dp_size > 1
assert local_dp_rank is not None
assert 0 <= local_dp_rank <= dp_rank < dp_size
self.dp_rank = dp_rank
self.dp_size = dp_size
dp_group, dp_store = parallel_config.stateless_init_dp_group(return_store=True)
self.dp_group, self.dp_store = dp_group, dp_store
DP 初始化:创建无状态进程组(stateless process group),用于 all-reduce 同步。每个 MoE DP rank 独立处理不同的请求子集,但需要定期同步"是否还有未完成请求"。
3.5.3 add_request / resume_scheduler — DP wave 管理(1291-1330行)
def add_request(self, request: Request, request_wave: int = 0):
super().add_request(request, request_wave)
if self.has_coordinator and request_wave != self.current_wave:
if request_wave > self.current_wave:
self.current_wave = request_wave
elif (
not self.engines_running
and self.scheduler.pause_state == PauseState.UNPAUSED
):
self.engines_running = True
self.output_queue.put_nowait(
(-1, EngineCoreOutputs(start_wave=self.current_wave))
)
DP Wave 追踪:
request_wave > current_wave:新 wave,更新计数器request_wave == current_wave - 1(旧 wave)且引擎空闲:通知前端需要启动新 wave
def resume_scheduler(self):
if self.pending_pause or (self.engines_running and self.ignore_start_dp_wave):
raise RuntimeError(...)
if self.engines_running:
return
super().resume_scheduler()
self.ignore_start_dp_wave = False
has_global_unfinished = ParallelConfig.has_unfinished_dp(
self.dp_group, self.scheduler.has_unfinished_requests()
)
if has_global_unfinished:
self.engines_running = True
恢复调度:需要与所有 DP rank 同步——通过 all-reduce 检查是否有任何 rank 还有未完成请求。
3.5.4 run_busy_loop — DP 核心循环(1342-1390行)
def run_busy_loop(self):
while self._handle_shutdown():
self._process_input_queue()
if self.eep_scaling_state is not None:
_ = self.eep_scaling_state.progress()
if self.eep_scaling_state.is_complete():
if self.eep_scaling_state.worker_type == "removing":
raise SystemExit
self.process_input_queue_block = True
self.eep_scaling_state = None
executed = self._process_engine_step()
self._maybe_publish_request_counts()
local_unfinished_reqs = self.scheduler.has_unfinished_requests()
if not executed:
if not local_unfinished_reqs and not self.engines_running:
continue
self.execute_dummy_batch()
self.engines_running = self._has_global_unfinished_reqs(
local_unfinished_reqs
)
if not self.engines_running:
client_index = -1 if self.has_coordinator else 0
self.output_queue.put_nowait(
(client_index, EngineCoreOutputs(wave_complete=self.current_wave))
)
self.current_wave += 1
self.step_counter = 0
raise SystemExit
DP 忙循环与单引擎的关键区别:
| 特性 | EngineCoreProc | DPEngineCoreProc |
|---|---|---|
| 空闲时 | 等待 input_queue | 执行 dummy batch 保持 DP 同步 |
| 请求完成 | 自动停止 | 需要 all-reduce 共识 |
| Wave 管理 | 无 | 有(wave_complete / start_wave) |
| EEP 处理 | 无 | 每次循环检查 scaling 状态 |
dummy batch 的必要性:当某些 DP rank 有请求而其他没有时,空闲 rank 需要执行空批次以参与 all-reduce,否则其他 rank 会永远阻塞。
3.5.5 _has_global_unfinished_reqs — DP 共识算法(1392-1410行)
def _has_global_unfinished_reqs(self, local_unfinished: bool) -> bool:
self.step_counter += 1
if self.step_counter % 32 != 0:
return True
has_unfinished, pause_consensus = ParallelConfig.sync_dp_state(
self.dp_group,
has_unfinished=local_unfinished,
pending_pause=self.pending_pause,
)
if pause_consensus:
self.ignore_start_dp_wave = True
self.pending_pause = False
return has_unfinished
优化策略:每32步才做一次 all-reduce。大部分时候假设还有未完成请求(返回 True),避免频繁同步开销。
pause_consensus:所有 DP rank 都同意暂停时,设置 ignore_start_dp_wave 防止 stale START_DP_WAVE 重新唤醒引擎。
3.5.6 reinitialize_distributed — 弹性重配置(1412-1465行)
def reinitialize_distributed(
self, reconfig_request: ReconfigureDistributedRequest
) -> None:
new_parallel_config = deepcopy(self.vllm_config.parallel_config)
# 更新 DP 配置...
is_scale_down = reconfig_request.new_data_parallel_size < old_dp_size
is_shutdown = (
reconfig_request.new_data_parallel_rank
== ReconfigureRankType.SHUTDOWN_CURRENT_RANK
)
self.eep_scaling_state = ElasticEPScalingState(
model_executor=self.model_executor,
engine_core=self,
vllm_config=self.vllm_config,
new_parallel_config=new_parallel_config,
worker_type="removing" if is_shutdown else "existing",
scale_type="scale_down" if is_scale_down else "scale_up",
reconfig_request=reconfig_request,
)
self.process_input_queue_block = False
弹性扩缩容:
- 深拷贝 parallel_config,更新 DP 参数
- 创建 ElasticEPScalingState 状态机
- 设置
process_input_queue_block = False:非阻塞处理输入,让状态机可以推进 - 状态机在
run_busy_loop的每次循环中progress()
3.5.7 _eep_send_engine_core_notification(1467-1500行)
def _eep_send_engine_core_notification(
self, notification_type, vllm_config=None,
):
notification_data = (notification_type.value, dp_rank)
outputs = EngineCoreOutputs(
utility_output=UtilityOutput(
call_id=EEP_NOTIFICATION_CALL_ID,
result=UtilityResult(notification_data),
)
)
if hasattr(self, "output_thread") and self.output_thread.is_alive():
self.output_queue.put_nowait((0, outputs))
else:
# 输出线程还没启动,直接通过 ZMQ 发送
encoder = MsgpackEncoder()
with (
zmq.Context() as ctx,
make_zmq_socket(ctx, self.addresses.outputs[0], zmq.PUSH, linger=4000) as socket,
):
socket.send_multipart(encoder.encode(outputs))
双路径发送:
- 输出线程已启动:通过 output_queue 间接发送(线程安全)
- 输出线程未启动:直接创建临时 ZMQ socket 发送(仅在初始化期间使用)
3.6 EngineCoreActorMixin — Ray Actor 基类
3.6.1 __init__ 方法(1508-1540行)
class EngineCoreActorMixin:
def __init__(self, vllm_config, addresses, dp_rank=0, local_dp_rank=0):
maybe_init_worker_tracer(...)
self.addresses = addresses
vllm_config.parallel_config.data_parallel_index = dp_rank
vllm_config.parallel_config.data_parallel_rank_local = local_dp_rank
self._set_visible_devices(vllm_config, local_dp_rank)
Ray Actor 的特殊处理:
- 不需要 ZMQ 握手(地址在创建前已知)
- 需要手动设置 CUDA_VISIBLE_DEVICES(Ray 的 GPU 管理不同于直接 spawn)
def _set_cuda_visible_devices(self, vllm_config, local_dp_rank, device_control_env_var):
world_size = vllm_config.parallel_config.world_size
try:
value = get_device_indices(device_control_env_var, local_dp_rank, world_size)
os.environ[device_control_env_var] = value
except IndexError as e:
raise Exception(...)
设备可见性设置:根据 local_dp_rank 和 world_size 计算应该看到哪些 GPU,设置环境变量。
@contextmanager
def _perform_handshakes(self, handshake_address, identity, local_client, ...):
yield self.addresses
简化握手:Ray Actor 模式下不需要 ZMQ 握手——地址在 actor 创建前就已知,直接 yield 已有地址。
def wait_for_init(self):
pass
def run(self):
try:
self.run_busy_loop()
except SystemExit:
logger.debug("EngineCore exiting.")
raise
except Exception:
logger.exception("EngineCore encountered a fatal error.")
raise
finally:
self.shutdown()
Ray Actor 生命周期:
wait_for_init():空方法,ray.get()调用保证__init__完成run():启动忙循环,异常处理与run_engine_core类似
3.7 DPMoEEngineCoreActor — MoE DP Ray Actor
class DPMoEEngineCoreActor(EngineCoreActorMixin, DPEngineCoreProc):
def __init__(self, vllm_config, local_client, addresses, executor_class, log_stats, dp_rank=0, local_dp_rank=0):
vllm_config.parallel_config.data_parallel_rank = dp_rank
EngineCoreActorMixin.__init__(self, vllm_config, addresses, dp_rank, local_dp_rank)
DPEngineCoreProc.__init__(self, vllm_config, local_client, "", executor_class, log_stats)
MRO(方法解析顺序):DPMoEEngineCoreActor → EngineCoreActorMixin → DPEngineCoreProc → EngineCoreProc → EngineCore
初始化顺序:
- 设置 DP rank
EngineCoreActorMixin.__init__:设备可见性 + 追踪器DPEngineCoreProc.__init__:DP 引擎核心(空字符串作为握手地址,因为不需要)
3.8 EngineCoreActor — 非 MoE Ray Actor
class EngineCoreActor(EngineCoreActorMixin, EngineCoreProc):
def __init__(self, vllm_config, local_client, addresses, executor_class, log_stats, dp_rank=0, local_dp_rank=0):
vllm_config.parallel_config.data_parallel_size = 1
vllm_config.parallel_config.data_parallel_size_local = 1
vllm_config.parallel_config.data_parallel_rank = 0
EngineCoreActorMixin.__init__(self, vllm_config, addresses, dp_rank, local_dp_rank)
EngineCoreProc.__init__(self, vllm_config, local_client, "", executor_class, log_stats, engine_index=dp_rank)
非 MoE DP 场景:与 run_engine_core 中的逻辑一致——重置 DP size 为 1,每个 rank 独立运行。
四、Mermaid图表汇总
4.1 类继承关系图
4.2 引擎核心主循环流程
4.3 Step 主流程(无批次队列)
4.4 Step with Batch Queue 流程
4.5 ZMQ 通信架构
4.6 数据并行(MoE)忙循环
4.7 握手协议时序
4.8 Sleep/Wake 生命周期
4.9 请求生命周期
4.10 Utility RPC 机制
五、设计模式与架构洞察
5.1 生产者-消费者模式
引擎核心采用三队列架构:
input_queue:输入线程 → 忙循环output_queue:忙循环 → 输出线程aborts_queue:输入线程 → 忙循环(快速通道)
这种设计解耦了 ZMQ I/O 和 GPU 计算,充分利用 ZMQ 释放 GIL 的特性,实现 I/O 与计算的重叠。
5.2 批次流水线模式
Pipeline Parallelism 的 batch_queue 实现了异步双缓冲:
- 调度器产生新批次入队(左端)
- 从队列右端取最老批次等待结果
- 优先填满队列而非获取结果
5.3 DP Wave 协调模式
MoE 数据并行采用 Wave 协调协议:
- 请求按 wave 分组
- 所有 DP rank 的同一 wave 完成后才开始下一 wave
- 通过 all-reduce(每32步一次)达成共识
- 空闲 rank 执行 dummy batch 保持同步
5.4 线程安全设计
关键线程安全保证:
mm_receiver_cache:仅在输入线程访问(初始化后只被 preprocess_add_request 使用)structured_output_manager.grammar_init:仅在输入线程调用scheduler:仅在忙循环线程访问(串行化保证)aborts_queue:Python queue.Queue 本身线程安全input_queue / output_queue:Python queue.Queue 线程安全
5.5 弹性扩缩容状态机
Elastic EP 通过 ElasticEPScalingState 实现:
- 新引擎:
worker_type="new",在 KV cache 初始化前开始状态机 - 已有引擎:
worker_type="existing",在收到重配置请求后开始 - 退出引擎:
worker_type="removing",完成后抛出 SystemExit process_input_queue_block = False:允许状态机推进时非阻塞处理输入
六、关键数据流总结
6.1 请求处理全路径
API Request
→ InputProcessor.assign_request_id()
→ EngineCoreRequest (msgspec.Struct)
→ ZMQ DEALER (msgpack encoded, type=b"\x00")
→ Input Socket Thread (decode + preprocess)
→ input_queue (ADD, (Request, wave))
→ Busy Loop (_handle_client_request)
→ scheduler.add_request()
→ scheduler.schedule() → SchedulerOutput
→ executor.execute_model() → Future<ModelRunnerOutput>
→ scheduler.update_from_output() → EngineCoreOutputs
→ output_queue (client_index, EngineCoreOutputs)
→ Output Socket Thread (encode + zero-copy ZMQ send)
→ Frontend PULL socket
→ EngineCoreClient
→ RequestOutput
6.2 初始化全路径
EngineCoreProc.__init__()
→ _perform_handshakes() → ZMQ addresses
→ EngineCore.__init__()
→ load_general_plugins()
→ executor_class(vllm_config) → 模型加载
→ _initialize_kv_caches()
→ executor.get_kv_cache_specs()
→ executor.determine_available_memory() (profile)
→ get_kv_cache_configs() → auto-fit max_model_len
→ executor.initialize_from_config() (allocate + warmup)
→ StructuredOutputManager()
→ Scheduler(vllm_config, kv_cache_config, ...)
→ freeze_gc_heap() + enable_envs_cache()
→ Start I/O threads
→ Wait for DP Coordinator READY
更多推荐

所有评论(0)