vLLM V1 Engine 深度分析 — Part1: Core引擎与数据定义

分析版本:基于 vLLM v1/engine 模块源码
分析范围:__init__.py(数据定义)+ core.py(核心引擎)
分析深度:逐行级别


一、模块定位

1.1 模块架构总览

vllm/v1/engine/ 是 vLLM v1 架构的核心引擎层,承上启下:

┌─────────────────────────────────────────┐
│  API Server / Frontend (LLMEngine)      │  ← 对外接口
├─────────────────────────────────────────┤
│  EngineCoreClient                       │  ← IPC 客户端
├─────────────────────────────────────────┤
│  ★ EngineCore / EngineCoreProc ★        │  ← 本模块
│    (调度 + 执行 + 输出)                  │
├─────────────────────────────────────────┤
│  Scheduler                              │  ← 请求调度
│  ModelExecutor                          │  ← 模型执行
│  KV Cache Manager                       │  ← 缓存管理
└─────────────────────────────────────────┘

核心职责:

  1. 数据协议定义__init__.py 定义了引擎与前端之间的所有通信数据结构
  2. 调度-执行循环core.pyEngineCore 实现了 schedule → execute → update 的主循环
  3. 进程管理EngineCoreProc 将引擎包裹在子进程中,通过 ZMQ 通信
  4. 数据并行DPEngineCoreProc 扩展了 MoE 模型的数据并行能力
  5. Actor 模式EngineCoreActor / DPMoEEngineCoreActor 提供 Ray Actor 封装

1.2 文件结构

文件 行数 核心内容
__init__.py 269 数据定义:枚举、Struct、请求/输出格式
core.py 2145 EngineCore、EngineCoreProc、DPEngineCoreProc、Actor类

二、init.py 逐行分析

2.1 导入区(1-17行)

import enum
import time
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Any, Literal

import msgspec
import numpy as np
import torch

逐行解析:

导入 用途
1-2 enum, time 枚举基类 + 时间戳
3 Mapping 类型注解:键值映射(用于trace_headers等)
4 dataclass Python 数据类装饰器(EngineCoreReadyResponse)
5 Any, Literal Any=任意类型;Literal=字面量类型约束
7 msgspec 关键依赖:高性能 msgpack 序列化库,替代 pydantic/dataclass
8 numpy routed_experts 字段的 np.ndarray
9 torch prompt_embeds 和 pooling_output 的 Tensor
from vllm.lora.request import LoRARequest
from vllm.multimodal.inputs import MultiModalFeatureSpec
from vllm.pooling_params import PoolingParams
from vllm.sampling_params import SamplingParams
from vllm.v1.metrics.stats import PrefillStats, SchedulerStats
from vllm.v1.outputs import LogprobsLists, LogprobsTensors
from vllm.v1.serial_utils import UtilityResult
导入 用途
LoRARequest LoRA 适配器请求(动态加载微调权重)
MultiModalFeatureSpec 多模态特征规格(图像/音频/视频嵌入)
PoolingParams 池化参数(embedding 模型专用,如 E5/BGE)
SamplingParams 采样参数(temperature, top_p, top_k 等)
PrefillStats Prefill 阶段统计信息
SchedulerStats 调度器统计信息
LogprobsLists / LogprobsTensors 对数概率输出(列表/张量两种形式)
UtilityResult 通用工具调用结果

2.2 类型别名与常量(19-26行)

# Type for pause_generation mode parameter.
# - "abort": Abort all in-flight requests immediately (default).
# - "wait": Wait for in-flight requests to complete before pausing.
# - "keep": Freeze requests in queue; they resume on resume_generation().
PauseMode = Literal["abort", "wait", "keep"]

PauseMode — 引擎暂停模式的三选一类型:

  • "abort":立即中止所有在途请求(默认模式,最激进)
  • "wait":等待在途请求完成后再暂停(最温和,仅 EngineCoreProc 支持)
  • "keep":冻结队列中的请求,恢复时继续处理(中间方案)
# These are possible values of RequestOutput.finish_reason,
# so form part of the external API.
FINISH_REASON_STRINGS = ("stop", "length", "abort", "error", "repetition")

FINISH_REASON_STRINGS — 完成原因的字符串表示元组,属于外部 API 契约

  • "stop":遇到停止字符串
  • "length":达到最大 token 数
  • "abort":客户端中止
  • "error":内部可重试错误(始终映射为 500)
  • "repetition":重复模式检测(幻觉检测)
EEP_NOTIFICATION_CALL_ID = -1

EEP_NOTIFICATION_CALL_ID — Elastic EP(弹性专家并行)通知的固定 call_id。-1 是保留值,表示这不是普通 utility 调用,而是引擎内部通知,不需要等待响应。

2.3 EEPNotificationType 枚举(28-34行)

class EEPNotificationType(enum.Enum):
    NEW_CORE_ENGINES_INIT_READY = "NEW_CORE_ENGINES_INIT_READY"
    NEW_CORE_ENGINES_WEIGHTS_INIT_READY = "NEW_CORE_ENGINES_WEIGHTS_INIT_READY"
    RECONFIGURE_FINISHED = "RECONFIGURE_FINISHED"
    SHUTDOWN_COMPLETE = "SHUTDOWN_COMPLETE"

EEP(Elastic Expert Parallelism)通知类型:

含义 触发者 接收者
NEW_CORE_ENGINES_INIT_READY 新引擎核心初始化完成 新加入的引擎 已有引擎
NEW_CORE_ENGINES_WEIGHTS_INIT_READY 新引擎权重加载完成 新加入的引擎 EngineCoreClient
RECONFIGURE_FINISHED 重配置完成 已有引擎 EngineCoreClient
SHUTDOWN_COMPLETE 缩容引擎关闭完成 退出的引擎 EngineCoreClient

这是 MoE 模型弹性缩扩容的协调协议。扩容时新引擎通知老引擎"我准备好了";缩容时退出引擎通知客户端释放资源。

2.4 FinishReason 枚举(37-58行)

class FinishReason(enum.IntEnum):
    """
    Reason a request finished - stop, length, abort, error, or repetition.

    Int rather than Str for more compact serialization.

    stop - a stop string was emitted
    length - max_tokens was consumed, or max_model_len was reached
    abort - aborted by client
    error - retryable request-level internal error (e.g., KV load failure).
            Invariant: always converted to 500 Internal Server Error.
    repetition - repetitive token pattern detected (hallucination)
    """
    STOP = 0
    LENGTH = 1
    ABORT = 2
    ERROR = 3
    REPETITION = 4

    def __str__(self):
        return FINISH_REASON_STRINGS[self.value]

设计要点:

  1. IntEnum 而非 StrEnum:整数比字符串序列化更紧凑(msgpack 中 1 byte vs 多字节),在每请求输出的高频场景下节省带宽
  2. __str__ 方法:通过 FINISH_REASON_STRINGS[self.value] 索引映射,保持对外 API 的字符串接口不变
  3. REPETITION = 4:幻觉检测——当模型陷入重复 token 循环时终止生成,这是 vLLM 的独特特性

index mapping

«IntEnum»

FinishReason

STOP = 0

LENGTH = 1

ABORT = 2

ERROR = 3

REPETITION = 4

+str() : str

FINISH_REASON_STRINGS

2.5 EngineCoreReadyResponse(60-68行)

@dataclass
class EngineCoreReadyResponse:
    """Sent from EngineCore to each frontend at the end of engine startup.

    Contains post-initialization config that may differ from the original
    values (e.g. max_model_len after KV cache auto-fitting).
    """
    max_model_len: int
    num_gpu_blocks: int
    dp_stats_address: str | None

启动就绪响应 — 引擎初始化完成后发送给每个前端:

字段 类型 含义
max_model_len int 最终的最大模型长度(可能因 KV cache 自动适配而缩小)
num_gpu_blocks int GPU 上的 KV cache 块数
dp_stats_address str | None 数据并行的统计发布地址(仅 DP>1 时有效)

为什么用 dataclass 而非 msgspec.Struct? 因为这个响应只在启动时发送一次,不走高频 IPC 通道,性能无关紧要。

2.6 EngineCoreRequest(70-126行)

class EngineCoreRequest(
    msgspec.Struct,
    array_like=True,  # type: ignore[call-arg]
    omit_defaults=True,  # type: ignore[call-arg]
    gc=False,
):  # type: ignore[call-arg]

msgspec.Struct 装饰器参数解析:

参数 含义
array_like=True 启用 允许像数组一样索引结构体字段,优化序列化为数组而非 map
omit_defaults=True 启用 序列化时省略默认值字段,减少传输量
gc=False 禁用 不追踪循环引用(Struct 无循环引用),加速 GC

字段逐一解析:

    request_id: str

请求唯一标识符,由 InputProcessor.assign_request_id() 分配。

    prompt_token_ids: list[int] | None

输入 token ID 列表。None 表示纯嵌入请求(只有 prompt_embeds,无 token)。

    mm_features: list[MultiModalFeatureSpec] | None

多模态特征列表。图像/音频/视频等输入会被预处理为特征规格。

    sampling_params: SamplingParams | None
    pooling_params: PoolingParams | None

采样参数或池化参数,二选一。采样用于生成模型,池化用于 embedding 模型。

    arrival_time: float

请求到达时间(单调时钟),用于调度优先级和统计。

    lora_request: LoRARequest | None

LoRA 请求——如果请求需要使用特定的 LoRA 适配器。

    cache_salt: str | None

缓存盐值——用于 prefix caching 的确定性哈希,防止哈希碰撞。

    data_parallel_rank: int | None

数据并行排名——指定该请求应该由哪个 DP rank 处理。

    prompt_embeds: torch.Tensor | None = None

预计算的 prompt 嵌入张量。当使用 embedding API 或混合模式时提供。

    prompt_is_token_ids: list[bool] | None = None

混合模式掩码:每个位置标记是真实 token(True)还是预计算嵌入(False)。

  • 纯 token 请求:None
  • 纯嵌入请求:None
  • 混合请求:如 [True, True, False, False, True] 表示前2个是token,中间2个是嵌入,最后1个是token
    client_index: int = 0

客户端索引——当多个前端连接同一引擎时,确保输出路由回正确的客户端。

    current_wave: int = 0

DP wave 编号——在数据并行场景下,请求预期属于哪一波调度。解决请求在 wave 完成通知之前到达的竞态条件。

    priority: int = 0

请求优先级(0=默认,越大越优先)。

    trace_headers: Mapping[str, str] | None

分布式追踪头(OpenTelemetry 等),用于端到端追踪。

    resumable: bool = False

是否可恢复——标记该请求在 preemption 后可以恢复而非重算。

    external_req_id: str | None = None

用户提供的原始请求 ID。request_id 是内部 ID,external_req_id 是用户 ID。支持 abort(req_id, internal=False) 场景。

    reasoning_ended: bool | None = None

推理/思考阶段是否已结束(用于 o1 类模型的 reasoning 模式)。

    reasoning_parser_kwargs: dict[str, Any] | None = None

推理解析器的额外参数(用于定制 reasoning token 的解析逻辑)。

params 属性:

    @property
    def params(self) -> SamplingParams | PoolingParams:
        """Return the processed params (sampling or pooling)."""
        if self.sampling_params is not None:
            return self.sampling_params
        assert self.pooling_params is not None
        return self.pooling_params

统一访问参数的便捷属性:优先返回 sampling_params,否则断言返回 pooling_params。

«msgspec.Struct»

EngineCoreRequest

+request_id: str

+prompt_token_ids: list<int> | None

+mm_features: list<MultiModalFeatureSpec> | None

+sampling_params: SamplingParams | None

+pooling_params: PoolingParams | None

+arrival_time: float

+lora_request: LoRARequest | None

+cache_salt: str | None

+data_parallel_rank: int | None

+prompt_embeds: torch.Tensor | None

+prompt_is_token_ids: list<bool> | None

+client_index: int

+current_wave: int

+priority: int

+trace_headers: Mapping | None

+resumable: bool

+external_req_id: str | None

+reasoning_ended: bool | None

+reasoning_parser_kwargs: dict | None

+params: SamplingParams | PoolingParams

2.7 EngineCoreEventType 和 EngineCoreEvent(128-152行)

class EngineCoreEventType(enum.IntEnum):
    """The type of engine core request event."""
    QUEUED = 1
    SCHEDULED = 2
    PREEMPTED = 3

事件类型:请求在引擎核心中的生命周期事件:

  • QUEUED=1:请求进入队列
  • SCHEDULED=2:请求被调度执行
  • PREEMPTED=3:请求被抢占(KV cache 不够时,低优先级请求让位)
class EngineCoreEvent(msgspec.Struct):
    """A timestamped engine core event associated with a request."""
    type: EngineCoreEventType
    timestamp: float

    @classmethod
    def new_event(
        cls, event_type: EngineCoreEventType, timestamp: float | None = None
    ) -> "EngineCoreEvent":
        timestamp = time.monotonic() if timestamp is None else timestamp
        return cls(event_type, timestamp)

设计要点:

  1. 单调时钟time.monotonic):不受系统时间调整影响,适合测量间隔
  2. 工厂方法new_event() 提供默认时间戳,简化调用
  3. 进程内有效:注释明确说明不应跨进程比较时间戳

2.8 EngineCoreOutput(154-187行)

class EngineCoreOutput(
    msgspec.Struct,
    array_like=True,
    omit_defaults=True,
    gc=False,
):
    request_id: str
    new_token_ids: list[int]

核心输出结构 — 每个请求每步的输出:

字段 类型 含义
request_id str 对应请求的 ID
new_token_ids list[int] 本次新生成的 token ID 列表(可能多个,如 spec decode)
    new_logprobs: LogprobsLists | None = None
    new_prompt_logprobs_tensors: LogprobsTensors | None = None
  • new_logprobs:新 token 的对数概率(列表形式,易 Python 处理)
  • new_prompt_logprobs_tensors:prompt token 的对数概率(张量形式,高效)
    pooling_output: torch.Tensor | None = None

池化输出——embedding 模型的输出向量。

    finish_reason: FinishReason | None = None
    stop_reason: int | str | None = None
  • finish_reason:完成原因枚举(None 表示未完成)
  • stop_reason:具体停止原因——stop token 的 ID(int)或字符串
    events: list[EngineCoreEvent] | None = None

请求生命周期事件列表(用于可观测性/调试)。

    kv_transfer_params: dict[str, Any] | None = None

KV 传输参数——用于跨引擎的 KV cache 迁移(disaggregated prefill)。

    trace_headers: Mapping[str, str] | None = None

追踪头——透传请求的追踪信息。

    prefill_stats: PrefillStats | None = None

Prefill 统计信息——仅在 prefill 完成步有值。

    routed_experts: np.ndarray | None = None

路由专家索引——MoE 模型每步激活的专家 ID 数组(用于分析/调试)。

    num_nans_in_logits: int = 0

logits 中的 NaN 数量。大于0意味着输出已损坏,这是关键的健康指标。

    @property
    def finished(self) -> bool:
        return self.finish_reason is not None

finished 属性:仅当 finish_reason 不为 None 时返回 True。

«msgspec.Struct»

EngineCoreOutput

+request_id: str

+new_token_ids: list<int>

+new_logprobs: LogprobsLists | None

+new_prompt_logprobs_tensors: LogprobsTensors | None

+pooling_output: torch.Tensor | None

+finish_reason: FinishReason | None

+stop_reason: int | str | None

+events: list<EngineCoreEvent> | None

+kv_transfer_params: dict | None

+trace_headers: Mapping | None

+prefill_stats: PrefillStats | None

+routed_experts: np.ndarray | None

+num_nans_in_logits: int

+finished: bool

FinishReason

EngineCoreEvent

2.9 UtilityOutput(189-197行)

class UtilityOutput(
    msgspec.Struct,
    array_like=True,
    gc=False,
):
    call_id: int
    failure_message: str | None = None
    result: UtilityResult | None = None

工具调用输出 — 用于引擎与前端之间的通用 RPC 调用:

字段 含义
call_id 调用 ID,匹配请求与响应
failure_message 非 None 表示调用失败
result 调用结果(成功时)

设计模式: failure_messageresult 互斥——要么成功有结果,要么失败有消息。

2.10 EngineCoreOutputs(199-236行)

class EngineCoreOutputs(
    msgspec.Struct,
    array_like=True,
    omit_defaults=True,
    gc=False,
):
    engine_index: int = 0
    outputs: list[EngineCoreOutput] = []
    scheduler_stats: SchedulerStats | None = None
    timestamp: float = 0.0
    utility_output: UtilityOutput | None = None
    finished_requests: set[str] | None = None
    wave_complete: int | None = None
    start_wave: int | None = None

    def __post_init__(self):
        if self.timestamp == 0.0:
            self.timestamp = time.monotonic()

引擎核心输出集合 — 一次 step 的所有输出:

字段 含义
engine_index 引擎索引(DP 场景下区分不同引擎)
outputs 每个请求的输出列表
scheduler_stats 调度器统计
timestamp 时间戳(__post_init__ 自动填充单调时钟)
utility_output 工具调用响应(如果本次 step 有 utility 调用)
finished_requests 已完成的请求 ID 集合(用于前端清理状态)
wave_complete DP wave 完成信号
start_wave 请求属于旧 wave,需要启动新 wave

注意 __post_init__:msgspec.Struct 也支持这个 dataclass 风格的钩子,在构造后自动设置时间戳。

2.11 EngineCoreRequestType 枚举(238-249行)

class EngineCoreRequestType(enum.Enum):
    """Request types defined as hex byte strings, so it can be sent over
    sockets without separate encoding step."""
    ADD = b"\x00"
    ABORT = b"\x01"
    START_DP_WAVE = b"\x02"
    UTILITY = b"\x03"
    EXECUTOR_FAILED = b"\x04"
    WAKEUP = b"\x05"

ZMQ 帧协议类型 — 用单字节编码,零开销解析:

类型 含义
b"\x00" ADD 添加新请求
b"\x01" ABORT 中止请求
b"\x02" START_DP_WAVE 启动新的 DP wave
b"\x03" UTILITY 通用 RPC 调用
b"\x04" EXECUTOR_FAILED 执行器失败信号(内部哨兵)
b"\x05" WAKEUP 唤醒空闲引擎(关闭时用)

为什么用字节而非整数? ZMQ 的多帧消息中,第一帧直接用这个字节作为类型标识符,无需额外编码/解码步骤。

2.12 ReconfigureDistributedRequest(251-261行)

class ReconfigureDistributedRequest(msgspec.Struct):
    new_data_parallel_size: int
    new_data_parallel_rank: int
    new_data_parallel_rank_local: int
    new_data_parallel_master_ip: str
    new_data_parallel_master_port: int
    new_data_parallel_master_port_list: list[int]
    coord_store_port: int

分布式重配置请求 — Elastic EP 扩缩容时的配置更新:

字段 含义
new_data_parallel_size 新的 DP 大小
new_data_parallel_rank 新的 DP 全局排名
new_data_parallel_rank_local 新的 DP 本地排名
new_data_parallel_master_ip 新的 DP master IP
new_data_parallel_master_port 新的 DP master 端口
new_data_parallel_master_port_list 所有 DP rank 的端口列表
coord_store_port 协调器存储端口

2.13 ReconfigureRankType 枚举(263-269行)

class ReconfigureRankType(enum.IntEnum):
    """Rank type for reconfiguring distributed request."""
    KEEP_CURRENT_RANK = -1
    SHUTDOWN_CURRENT_RANK = -2
含义
KEEP_CURRENT_RANK = -1 保持当前排名不变
SHUTDOWN_CURRENT_RANK = -2 关闭当前排名(缩容时退出)

负值保留——避免与正常 DP rank(≥0)混淆。


三、core.py 逐行分析

3.1 导入区(1-57行)

import gc
import os
import queue
import signal
import threading
import time
from collections import defaultdict, deque
from collections.abc import Callable, Generator
from concurrent.futures import Future
from contextlib import ExitStack, contextmanager
from enum import IntEnum
from functools import partial
from inspect import isclass, signature
from logging import DEBUG
from multiprocessing.queues import Queue
from typing import Any, TypeVar, cast
导入 用途
gc GC 冻结/解冻(freeze_gc_heap / gc.unfreeze
queue 线程间通信队列(input_queue, aborts_queue)
signal SIGTERM/SIGINT 处理
threading I/O 线程管理
deque 批次队列(有界双端队列)
Future 异步结果
ExitStack 多个上下文管理器的统一管理
IntEnum EngineShutdownState
partial 部分函数应用(idle_state_callbacks)
isclass, signature 运行时类型检查(msgspec 参数转换)
Queue 多进程张量共享队列
import msgspec
import zmq

zmq(ZeroMQ):引擎与前端之间的 IPC 传输层。选择 ZMQ 的原因:

  1. 多种模式(DEALER/ROUTER/PUSH/PULL/XSUB/XPUB)
  2. 零拷贝消息传递
  3. 自动重连
  4. 跨进程/跨网络
import vllm.envs as envs
from vllm.config import ParallelConfig, VllmConfig
from vllm.distributed import stateless_destroy_torch_distributed_process_group
from vllm.envs import enable_envs_cache
from vllm.logger import init_logger
from vllm.logging_utils.dump_input import dump_engine_exception
from vllm.lora.request import LoRARequest
from vllm.multimodal import MULTIMODAL_REGISTRY
from vllm.tasks import POOLING_TASKS, SupportedTask
from vllm.tracing import instrument, maybe_init_worker_tracer
from vllm.transformers_utils.config import maybe_register_config_serialize_by_value
from vllm.utils import numa_utils
from vllm.utils.gc_utils import freeze_gc_heap, maybe_attach_gc_debug_callback
from vllm.utils.hashing import get_hash_fn_by_name
from vllm.utils.network_utils import make_zmq_socket
from vllm.utils.system_utils import decorate_logs, set_process_title
导入 用途
VllmConfig 全局配置对象
stateless_destroy_torch_distributed_process_group 无状态销毁 DP 进程组
dump_engine_exception 异常时转储调度器输入
MULTIMODAL_REGISTRY 多模态处理器注册表
POOLING_TASKS / SupportedTask 任务类型定义
instrument 分布式追踪装饰器
maybe_init_worker_tracer 初始化 worker 追踪器
freeze_gc_heap 冻结 GC 堆(减少启动后 GC 压力)
get_hash_fn_by_name 按名称获取哈希函数(prefix caching)
make_zmq_socket ZMQ socket 工厂
set_process_title 设置进程标题(便于监控)
from vllm.v1.core.kv_cache_utils import (
    BlockHash,
    generate_scheduler_kv_cache_config,
    get_kv_cache_configs,
    get_request_block_hasher,
    init_none_hash,
    resolve_kv_cache_block_sizes,
)
from vllm.v1.core.sched.interface import PauseState, SchedulerInterface
from vllm.v1.core.sched.output import SchedulerOutput
from vllm.v1.engine import (...)
from vllm.v1.engine.tensor_ipc import TensorIpcReceiver
from vllm.v1.engine.utils import (
    EngineHandshakeMetadata,
    EngineZmqAddresses,
    SignalCallback,
    get_device_indices,
)
from vllm.v1.executor import Executor
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.metrics.stats import SchedulerStats
from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.request import Request, RequestStatus
from vllm.v1.serial_utils import MsgpackDecoder, MsgpackEncoder
from vllm.v1.structured_output import StructuredOutputManager
from vllm.v1.utils import compute_iteration_details
from vllm.version import __version__ as VLLM_VERSION

关键依赖链:

  • SchedulerInterface → 调度器抽象接口
  • Executor → 模型执行器抽象接口
  • Request / RequestStatus → 内部请求表示
  • MsgpackDecoder / MsgpackEncoder → 高性能序列化
  • StructuredOutputManager → 约束输出(JSON schema 等)
  • TensorIpcReceiver → 进程间张量共享
logger = init_logger(__name__)
HANDSHAKE_TIMEOUT_MINS = 5
_R = TypeVar("_R")
  • HANDSHAKE_TIMEOUT_MINS = 5:握手超时5分钟
  • _Rcollective_rpc 的返回类型变量

3.2 EngineCore 类 — 核心调度执行循环

3.2.1 __init__ 方法(61-155行)

伪代码概览:

EngineCore.__init__(vllm_config, executor_class, log_stats, ...):
    1. 加载插件
    2. 创建模型执行器
    3. [EEP] 弹性扩容预处理
    4. 初始化 KV cache(包含内存 profiling)
    5. 创建结构化输出管理器
    6. 创建调度器
    7. [如果KV connector] 收集握手元数据
    8. 创建多模态接收器缓存
    9. [Pipeline Parallel] 初始化批次队列
    10. 初始化请求块哈希器
    11. 选择 step 函数(普通/批次队列)
    12. 冻结 GC 堆,启用环境变量缓存

逐段分析:

    def __init__(
        self,
        vllm_config: VllmConfig,
        executor_class: type[Executor],
        log_stats: bool,
        executor_fail_callback: Callable | None = None,
        include_finished_set: bool = False,
    ):
参数 含义
vllm_config 全局配置
executor_class 执行器类(非实例,延迟创建)
log_stats 是否记录统计信息
executor_fail_callback 执行器失败回调
include_finished_set 输出中是否包含已完成请求集合
        from vllm.plugins import load_general_plugins
        load_general_plugins()

插件加载:在引擎/调度器层面加载通用插件,允许第三方扩展引擎行为。

        self.vllm_config = vllm_config
        if not vllm_config.parallel_config.data_parallel_rank_local:
            logger.info(
                "Initializing a V1 LLM engine (v%s) with config: %s",
                VLLM_VERSION,
                vllm_config,
            )

仅在非 DP 本地排名(即 rank 0)时打印初始化日志,避免 DP 场景下重复日志。

        self.log_stats = log_stats
        self.model_executor = executor_class(vllm_config)
        if executor_fail_callback is not None:
            self.model_executor.register_failure_callback(executor_fail_callback)

创建执行器并注册失败回调:执行器创建后会进行模型加载等初始化。

        self.available_gpu_memory_for_kv_cache = -1
        if envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
            self._eep_scale_up_before_kv_init()

EEP 扩容预处理:在 KV cache 初始化之前,如果正在弹性扩容,先执行预处理(基类中 raise NotImplementedError,DPEngineCoreProc 覆写)。

        kv_cache_config = self._initialize_kv_caches(vllm_config)
        self.structured_output_manager = StructuredOutputManager(vllm_config)

初始化 KV cache 和结构化输出管理器:KV cache 初始化是最关键的步骤(详见下文)。

        Scheduler = vllm_config.scheduler_config.get_scheduler_cls()

动态获取调度器类:根据配置选择不同的调度策略(如 ChunkedScheduler、Scheduler 等)。

        if len(kv_cache_config.kv_cache_groups) == 0:
            if vllm_config.scheduler_config.enable_chunked_prefill:
                logger.warning("Disabling chunked prefill for model without KVCache")
                vllm_config.scheduler_config.enable_chunked_prefill = False

无 KV cache 模型:某些 encoder-only 模型不需要 KV cache,此时必须禁用 chunked prefill(因为 chunked prefill 依赖 KV cache 块管理)。

        scheduler_block_size, hash_block_size = resolve_kv_cache_block_sizes(
            kv_cache_config, vllm_config
        )
        self.scheduler: SchedulerInterface = Scheduler(
            vllm_config=vllm_config,
            kv_cache_config=kv_cache_config,
            structured_output_manager=self.structured_output_manager,
            include_finished_set=include_finished_set,
            log_stats=self.log_stats,
            block_size=scheduler_block_size,
            hash_block_size=hash_block_size,
        )

创建调度器

  • scheduler_block_size:调度器使用的块大小(决定 prefill 切分粒度)
  • hash_block_size:哈希块大小(用于 prefix caching 的块级哈希)
        self.use_spec_decode = vllm_config.speculative_config is not None
        if self.scheduler.connector is not None:
            self.model_executor.init_kv_output_aggregator(self.scheduler.connector)

投机解码与 KV connector

  • 投机解码:使用小模型快速生成 draft tokens,再由大模型验证
  • KV connector:如果调度器有 KV 连接器(跨引擎 KV cache 共享),初始化输出聚合器
        mm_registry = MULTIMODAL_REGISTRY
        self.mm_receiver_cache = mm_registry.engine_receiver_cache_from_config(
            vllm_config
        )

多模态接收器缓存:预分配多模态特征处理的缓存,避免运行时重复计算。

        kv_connector = self.scheduler.get_kv_connector()
        if kv_connector is not None:
            xfer_handshake_metadata = (
                self.model_executor.get_kv_connector_handshake_metadata()
            )
            if xfer_handshake_metadata:
                content: dict[int, Any] = {}
                for worker_dict in xfer_handshake_metadata:
                    if worker_dict is not None:
                        content.update(worker_dict)
                kv_connector.set_xfer_handshake_metadata(content)

KV 传输握手

  1. 从所有 worker 收集 KV 连接器的传输元数据
  2. 每个 worker 返回 {tp_rank: metadata} 格式的字典
  3. 合并所有 worker 的元数据到一个统一字典
  4. 设置到 KV connector,使其拥有完整的跨 worker 上下文
        self.batch_queue_size = self.model_executor.max_concurrent_batches
        self.batch_queue: (
            deque[tuple[Future[ModelRunnerOutput], SchedulerOutput, Future[Any]]] | None
        ) = None
        if self.batch_queue_size > 1:
            logger.debug("Batch queue is enabled with size %d", self.batch_queue_size)
            self.batch_queue = deque(maxlen=self.batch_queue_size)

批次队列 — Pipeline Parallelism 的关键优化:

┌───────────┐    ┌───────────┐    ┌───────────┐
│ Schedule  │ →  │ Execute   │ →  │ Update    │
│ Batch N+2 │    │ Batch N+1 │    │ Batch N   │
└───────────┘    └───────────┘    └───────────┘

max_concurrent_batches > 1 时,可以重叠调度和执行,消除流水线气泡。队列中的元素是 (sample_future, scheduler_output, exec_future) 三元组。

        self.is_ec_consumer = (
            vllm_config.ec_transfer_config is None
            or vllm_config.ec_transfer_config.is_ec_consumer
        )
        self.is_pooling_model = vllm_config.model_config.runner_type == "pooling"
  • is_ec_consumer:是否是 Encoder-Consumer 模式(disaggregated prefill 中的消费端)
  • is_pooling_model:是否是池化/embedding 模型
        self.request_block_hasher: Callable[[Request], list[BlockHash]] | None = None
        if vllm_config.cache_config.enable_prefix_caching or kv_connector is not None:
            caching_hash_fn = get_hash_fn_by_name(
                vllm_config.cache_config.prefix_caching_hash_algo
            )
            init_none_hash(caching_hash_fn)
            self.request_block_hasher = get_request_block_hasher(
                hash_block_size, caching_hash_fn
            )

请求块哈希器 — Prefix Caching 的核心:

  1. 获取配置的哈希函数(如 xxhash, sha256)
  2. 初始化 none_hash(空块的预计算哈希值)
  3. 创建 request_block_hasher:将请求的 token 序列映射为块哈希列表

当 prefix caching 启用时,相同前缀的请求可以共享 KV cache 块,避免重复计算。

        self.step_fn = (
            self.step if self.batch_queue is None else self.step_with_batch_queue
        )
        self.async_scheduling = vllm_config.scheduler_config.async_scheduling

选择 step 函数

  • 无批次队列:step() — 同步调度执行
  • 有批次队列:step_with_batch_queue() — 异步重叠调度执行
        self.aborts_queue = queue.Queue[list[str]]()
        self._idle_state_callbacks: list[Callable] = []
  • aborts_queue:线程安全的 abort 请求队列(input socket 线程 → 核心循环)
  • _idle_state_callbacks:引擎空闲时的回调列表(用于 pause/wait 逻辑)
        freeze_gc_heap()
        maybe_attach_gc_debug_callback()
        enable_envs_cache()

启动后优化三件套

  1. 冻结 GC 堆:将启动期间分配的对象(模型权重、KV cache)标记为静态,减少 GC 扫描时间
  2. 附加 GC 调试回调:如果启用,在 GC 时打印额外调试信息
  3. 启用环境变量缓存:启动后不再有环境变量变更,缓存 os.getenv() 结果

Yes

No

Yes

No

Yes

No

__init__ 入口

加载插件

创建 ModelExecutor

EEP 扩容?

_eep_scale_up_before_kv_init

_initialize_kv_caches

创建 StructuredOutputManager

创建 Scheduler

KV connector?

收集传输握手元数据

创建 MM receiver cache

Pipeline Parallel?

初始化 batch_queue

选择 step_fn

初始化 request_block_hasher

freeze_gc_heap + enable_envs_cache

3.2.2 _initialize_kv_caches 方法(157-210行)

伪代码:

_initialize_kv_caches(vllm_config):
    1. 获取 KV cache 规格列表
    2. [有 KV cache]
       a. [EEP] 使用已有的 GPU 内存信息
       b. [非 EEP] 执行内存 profiling → 确定 GPU 可用内存
    3. [无 KV cache] available = 0
    4. 计算每个 KV cache 组的配置(块数、大小)
    5. [自动适配] 如果 max_model_len 被缩小,同步到 workers
    6. 生成调度器 KV cache 配置
    7. 更新 vllm_config 的 cache_config
    8. 初始化 KV cache 并 warmup
    9. 返回调度器 KV cache 配置
    @instrument(span_name="Prepare model")
    def _initialize_kv_caches(self, vllm_config: VllmConfig) -> KVCacheConfig:
        start = time.time()
        kv_cache_specs = self.model_executor.get_kv_cache_specs()

获取 KV cache 规格:从执行器获取模型各层所需的 KV cache 规格(每层的 head 数、head 维度、数据类型等)。

        has_kv_cache = any(kv_cache_spec for kv_cache_spec in kv_cache_specs)
        if has_kv_cache:
            if envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
                assert self.available_gpu_memory_for_kv_cache > 0
                available_gpu_memory = [self.available_gpu_memory_for_kv_cache] * len(
                    kv_cache_specs
                )
            else:
                available_gpu_memory = self.model_executor.determine_available_memory()
                self.available_gpu_memory_for_kv_cache = available_gpu_memory[0]
        else:
            available_gpu_memory = [0] * len(kv_cache_specs)

内存可用性判断三路分支:

场景 逻辑
EEP 扩容 使用预先设置的 GPU 内存值(跳过 profiling)
正常启动 执行 determine_available_memory()(运行 dummy forward pass 测量峰值内存)
无 KV cache 直接设为 0
        max_model_len_before = vllm_config.model_config.max_model_len
        kv_cache_configs = get_kv_cache_configs(
            vllm_config, kv_cache_specs, available_gpu_memory
        )
        max_model_len_after = vllm_config.model_config.max_model_len
        if max_model_len_after != max_model_len_before:
            self.collective_rpc("update_max_model_len", args=(max_model_len_after,))

自动适配检测get_kv_cache_configs 可能因为 GPU 内存不足而自动缩小 max_model_len。如果发生缩小,需要通过 collective_rpc 同步新值到所有 workers(因为 workers 在 memory profiling 之前就启动了,持有原始的较大值)。

        scheduler_kv_cache_config = generate_scheduler_kv_cache_config(kv_cache_configs)
        vllm_config.cache_config.num_gpu_blocks = scheduler_kv_cache_config.num_blocks
        kv_cache_groups = scheduler_kv_cache_config.kv_cache_groups
        if kv_cache_groups:
            vllm_config.cache_config.block_size = min(
                g.kv_cache_spec.block_size for g in kv_cache_groups
            )
        vllm_config.validate_block_size()

配置同步:将 KV cache 配置回写到 vllm_config,确保一致性。block_size 取所有 KV cache 组中最小的块大小。

        self.model_executor.initialize_from_config(kv_cache_configs)

最终初始化:根据计算出的 KV cache 配置,在 GPU 上分配 KV cache 内存并执行 warmup。

3.2.3 add_request / abort_requests 方法(212-237行)
    def add_request(self, request: Request, request_wave: int = 0):
        if not isinstance(request.request_id, str):
            raise TypeError(
                f"request_id must be a string, got {type(request.request_id)}"
            )
        if pooling_params := request.pooling_params:
            supported_pooling_tasks = [
                task for task in self.get_supported_tasks() if task in POOLING_TASKS
            ]
            if pooling_params.task not in supported_pooling_tasks:
                raise ValueError(...)
        if request.kv_transfer_params is not None and (
            not self.scheduler.get_kv_connector()
        ):
            logger.warning(
                "Got kv_transfer_params, but no KVConnector found. "
                "Disabling KVTransfer for this request."
            )
        self.scheduler.add_request(request)

请求添加逻辑:

  1. 类型校验request_id 必须是字符串(ZMQ 传输要求)
  2. 任务校验:pooling 请求必须支持对应任务类型
  3. KV 传输降级:如果请求要求 KV 传输但无连接器,发出警告但不拒绝
  4. 最终:委托给调度器的 add_request
    def abort_requests(self, request_ids: list[str]):
        self.scheduler.finish_requests(request_ids, RequestStatus.FINISHED_ABORTED)

中止请求:直接标记为 ABORTED 完成,调度器会在下次 update 时处理。

3.2.4 log_error_detail / log_iteration_details 上下文管理器(239-272行)
    @contextmanager
    def log_error_detail(self, scheduler_output: SchedulerOutput):
        try:
            yield
        except Exception as err:
            dump_engine_exception(
                self.vllm_config, scheduler_output, self.scheduler.make_stats()
            )
            raise err

错误详情转储:模型执行异常时,将调度器输入转储到日志,帮助调试。dump_engine_exception 是异常安全的(不会抛出新异常)。

    @contextmanager
    def log_iteration_details(self, scheduler_output: SchedulerOutput):
        if not self.vllm_config.observability_config.enable_logging_iteration_details:
            yield
            return
        ...
        iteration_details = compute_iteration_details(scheduler_output)
        before = time.monotonic()
        yield
        logger.info(
            "Iteration(N): M context requests, N context tokens, "
            "K generation requests, L generation tokens, elapsed: X ms"
        )
        self._iteration_index += 1

迭代详情日志:如果启用了,在每次 step 前后测量时间并记录请求/令牌数量。注意 yield 在中间,先记录时间,执行后再输出。

3.2.5 step 方法(274-298行) — 核心主循环
    def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
        """Schedule, execute, and make output.

        Returns tuple of outputs and a flag indicating whether the model
        was executed.
        """

返回值(outputs_dict, model_executed)

  • outputs_dict{client_index: EngineCoreOutputs} 映射
  • model_executed:本次 step 是否实际执行了模型(可能没有可调度请求)
        if not self.scheduler.has_requests():
            return {}, False

快速路径:如果调度器没有任何请求(包括正在运行的),直接返回空结果。

        scheduler_output = self.scheduler.schedule()
        future = self.model_executor.execute_model(scheduler_output, non_block=True)
        grammar_output = self.scheduler.get_grammar_bitmask(scheduler_output)

三步核心操作:

  1. schedule():调度器决定哪些请求在本次 step 执行,分配 KV cache 块
  2. execute_model(non_block=True):异步执行模型前向传播,返回 Future
  3. get_grammar_bitmask():获取结构化输出的语法位掩码(JSON schema 约束)

注意 non_block=True:执行器异步启动,不阻塞调度循环。

        with (
            self.log_error_detail(scheduler_output),
            self.log_iteration_details(scheduler_output),
        ):
            model_output = future.result()
            if model_output is None:
                model_output = self.model_executor.sample_tokens(grammar_output)

等待模型输出 + 采样

  1. future.result():阻塞等待模型前向传播完成
  2. 如果 model_output is None:模型执行只做了前向传播,需要单独采样 token
    • 某些执行器(如 GPU)将前向传播和采样分开
  3. 如果 model_output is not None:执行器已经同时完成了前向传播和采样
        self._process_aborts_queue()
        engine_core_outputs = self.scheduler.update_from_output(
            scheduler_output, model_output
        )
        return engine_core_outputs, scheduler_output.total_num_scheduled_tokens > 0

后处理:

  1. 处理 abort 队列(在模型执行期间收到的中止请求)
  2. update_from_output():调度器根据模型输出更新内部状态——推进 token、检测停止条件、释放 KV cache 块
  3. 返回输出和是否执行了模型

No

Yes

Yes

No

step 入口

调度器有请求?

return 空输出

scheduler.schedule

execute_model non_block

get_grammar_bitmask

等待 future.result

model_output is None?

sample_tokens

使用 model_output

_process_aborts_queue

scheduler.update_from_output

return outputs + model_executed

3.2.6 post_step 方法(300-307行)
    def post_step(self, model_executed: bool) -> None:
        if not self.async_scheduling and self.use_spec_decode and model_executed:
            draft_token_ids = self.model_executor.take_draft_token_ids()
            if draft_token_ids is not None:
                self.scheduler.update_draft_token_ids(draft_token_ids)

后步处理 — 仅在同步调度 + 投机解码 + 模型执行时:

  1. 从执行器取走 draft token IDs
  2. 更新调度器中的 draft token 信息(用于下次调度的验证/接受)

异步调度模式下,draft tokens 在 worker 进程中直接更新,无需主循环介入。

3.2.7 step_with_batch_queue 方法(309-381行) — Pipeline Parallel 核心循环
    def step_with_batch_queue(
        self,
    ) -> tuple[dict[int, EngineCoreOutputs] | None, bool]:

关键区别

  • step():同步,一步到位
  • step_with_batch_queue():异步,批次流水线化,可能返回 None(新调度入队但无输出就绪)

伪代码:

step_with_batch_queue():
    1. 如果调度器有请求且队列未满:
       a. 调度 → 异步执行 → [可选] 异步采样
       b. 将 (future, scheduler_output, exec_future) 加入队列左端
       c. 如果模型执行了且队列未满且最老批次未完成 → return None (继续调度)
    2. 如果调度器无请求且队列为空 → return (None, False)
    3. 从队列右端弹出最老批次,等待其完成
    4. 处理 aborts,更新调度器输出
    5. 处理延迟采样(结构化输出 + 投机解码的交互)
    6. return (outputs, model_executed)

详细分析:

        model_executed = False
        deferred_scheduler_output = None
        if self.scheduler.has_requests():
            scheduler_output = self.scheduler.schedule()
            with self.log_error_detail(scheduler_output):
                exec_future = self.model_executor.execute_model(
                    scheduler_output, non_block=True
                )
            if self.is_ec_consumer:
                model_executed = scheduler_output.total_num_scheduled_tokens > 0

EC consumer 判断:在 disaggregated prefill 模式中,consumer 端不执行模型,但可能接收远程 KV cache。model_executed 的判断方式不同。

            if self.is_pooling_model or not model_executed:
                future = cast(Future[ModelRunnerOutput], exec_future)
            else:
                if not scheduler_output.pending_structured_output_tokens:
                    grammar_output = self.scheduler.get_grammar_bitmask(
                        scheduler_output
                    )
                    future = self.model_executor.sample_tokens(
                        grammar_output, non_block=True
                    )
                else:
                    deferred_scheduler_output = scheduler_output

采样决策三路分支:

条件 操作
池化模型 or 无执行 直接使用 exec_future
有执行 + 无延迟结构化输出 立即采样
有执行 + 有延迟结构化输出 延迟采样(deferred_scheduler_output)

延迟采样的原因:投机解码 + 结构化输出时,需要先验证 draft tokens 才能计算语法位掩码。当前步的 draft tokens 来自上一步的输出,所以需要等上一步的 model_output 可用后才能处理。

            if not deferred_scheduler_output:
                batch_queue.appendleft((future, scheduler_output, exec_future))
                if (
                    model_executed
                    and len(batch_queue) < self.batch_queue_size
                    and not batch_queue[-1][0].done()
                ):
                    return None, True

优先填满队列:如果新批次已入队、队列未满、最老批次未完成,则不阻塞等待输出,继续调度新批次。return None 表示本次调用无输出。

        future, scheduler_output, exec_model_fut = batch_queue.pop()
        with (
            self.log_error_detail(scheduler_output),
            self.log_iteration_details(scheduler_output),
        ):
            model_output = future.result()
            if model_output is None:
                exec_model_fut.result()
                raise RuntimeError("unexpected error")

等待最老批次完成batch_queue.pop() 从右端(最老)弹出。如果 sample 返回 None,说明原始 execute_model 调用失败了,重新 raise 其异常。

        if deferred_scheduler_output:
            if self.use_spec_decode:
                draft_token_ids = self.model_executor.take_draft_token_ids()
                assert draft_token_ids is not None
                self.scheduler.update_draft_token_ids_in_output(
                    draft_token_ids, deferred_scheduler_output
                )
            grammar_output = self.scheduler.get_grammar_bitmask(
                deferred_scheduler_output
            )
            future = self.model_executor.sample_tokens(grammar_output, non_block=True)
            batch_queue.appendleft((future, deferred_scheduler_output, exec_future))

延迟采样处理

  1. 如果使用投机解码,先获取 draft token IDs 并更新延迟调度器输出
  2. 然后计算语法位掩码
  3. 最后异步采样并将新批次入队

Yes

No

Yes

No + 队列空

No + 队列非空

Yes

No

step_with_batch_queue 入口

调度器有请求?

schedule + execute_model

需要延迟采样?

入队 + 可能 return None

标记 deferred

队列有结果?

return None False

弹出最老批次

等待 future.result

_process_aborts_queue

update_from_output

有延迟采样?

处理 draft tokens + grammar + sample

return outputs

3.2.8 辅助方法(383-430行)
    def _process_aborts_queue(self):
        if not self.aborts_queue.empty():
            request_ids = []
            while not self.aborts_queue.empty():
                ids = self.aborts_queue.get_nowait()
                request_ids.extend((ids,) if isinstance(ids, str) else ids)
            self.abort_requests(request_ids)

批量处理 abort:将队列中所有 abort 合并为一次调用,比逐个处理更高效。

    def shutdown(self):
        self.structured_output_manager.clear_backend()
        if self.model_executor:
            self.model_executor.shutdown()
        if self.scheduler:
            self.scheduler.shutdown()
        gc.unfreeze()

关闭顺序:结构化输出 → 执行器 → 调度器 → 解冻 GC。gc.unfreeze() 是关键——让启动时冻结的对象重新可被 GC 回收,避免进程内删除引擎时 GPU 内存泄漏。

    def profile(self, is_start: bool = True, profile_prefix: str | None = None):
        self.model_executor.profile(is_start, profile_prefix)

性能分析:委托给执行器,支持 start/stop 控制。

3.2.9 缓存重置方法(440-465行)
    def reset_mm_cache(self):
        if self.scheduler.has_unfinished_requests():
            logger.warning(...)
        if self.mm_receiver_cache is not None:
            self.mm_receiver_cache.clear_cache()
        self.model_executor.reset_mm_cache()

    def reset_prefix_cache(self, reset_running_requests=False, reset_connector=False) -> bool:
        return self.scheduler.reset_prefix_cache(reset_running_requests, reset_connector)

    def reset_encoder_cache(self) -> None:
        if self.scheduler.has_unfinished_requests():
            logger.warning(...)
        self.scheduler.reset_encoder_cache()
        self.model_executor.reset_encoder_cache()

三级缓存重置

缓存 逻辑层 物理层 作用
MM cache receiver_cache executor 多模态特征
Prefix cache scheduler - KV cache 前缀共享
Encoder cache scheduler executor 编码器输出

所有重置方法在有未完成请求时发出警告(可能导致缓存不同步)。

    def _reset_caches(self, reset_running_requests=True) -> None:
        self.reset_prefix_cache(reset_running_requests=reset_running_requests)
        self.reset_mm_cache()
        self.reset_encoder_cache()

统一重置:在 sleep/pause 时调用,清空所有缓存。

3.2.10 暂停/恢复/睡眠方法(467-531行)
    def pause_scheduler(
        self, mode: PauseMode = "abort", clear_cache: bool = True
    ) -> Future | None:
        if mode not in ("keep", "abort", "wait"):
            raise ValueError(f"Invalid pause mode: {mode}")
        if mode == "wait":
            raise ValueError("'wait' mode can't be used in inproc-engine mode")

        if mode == "abort":
            self.scheduler.finish_requests(None, RequestStatus.FINISHED_ABORTED)

        pause_state = PauseState.PAUSED_ALL if mode == "keep" else PauseState.PAUSED_NEW
        self.scheduler.set_pause_state(pause_state)
        if clear_cache:
            self._reset_caches()

        return None

EngineCore 的 pause_scheduler — 进程内模式,不支持 “wait”:

模式 行为
"abort" 中止所有请求 + PAUSED_NEW + 清缓存
"wait" ❌ 不支持(进程内无法等待在途请求排空)
"keep" PAUSED_ALL + 清缓存(保持请求在队列中)
    def sleep(self, level: int = 1, mode: PauseMode = "abort") -> None | Future:

睡眠层级:

Level 效果
0 仅暂停调度,GPU 内存不变
1 卸载模型权重到 CPU,丢弃 KV cache
2 丢弃所有 GPU 内存
        pause_future = self.pause_scheduler(mode=mode, clear_cache=level >= 1)
        if level < 1:
            return pause_future
        model_executor = self.model_executor
        if pause_future is None:
            model_executor.sleep(level)
            return None
        future = Future[Any]()
        def pause_complete(f: Future):
            try:
                f.result()
                future.set_result(model_executor.sleep(level))
            except Exception as e:
                future.set_exception(e)
        pause_future.add_done_callback(pause_complete)
        return future

睡眠流程:先暂停调度,再通知执行器处理 GPU 内存。如果 pause_future 不为 None(等待模式),先等暂停完成再睡眠。

    def wake_up(self, tags: list[str] | None = None):
        if tags is not None and "scheduling" in tags:
            tags = [t for t in tags if t != "scheduling"]
        if tags is None or tags:
            self.model_executor.wake_up(tags)
        self.resume_scheduler()

唤醒流程:先恢复 GPU 内存(如果需要),再恢复调度。tags=["scheduling"] 仅恢复调度不恢复 GPU 内存(level 0 唤醒)。

3.2.11 LoRA / collective_rpc / preprocess 方法(533-575行)
    def add_lora(self, lora_request: LoRARequest) -> bool:
        return self.model_executor.add_lora(lora_request)

    def remove_lora(self, lora_id: int) -> bool:
        return self.model_executor.remove_lora(lora_id)

    def list_loras(self) -> set[int]:
        return self.model_executor.list_loras()

    def pin_lora(self, lora_id: int) -> bool:
        return self.model_executor.pin_lora(lora_id)

LoRA 代理方法:全部委托给执行器。LoRA 是运行时动态加载的模型适配器。

    def collective_rpc(
        self,
        method: str | Callable[..., _R],
        timeout: float | None = None,
        args: tuple = (),
        kwargs: dict[str, Any] | None = None,
    ) -> list[_R]:
        return self.model_executor.collective_rpc(method, timeout, args, kwargs)

collective_rpc:对所有 worker 执行同一方法,返回结果列表。用于广播式操作(如 update_max_model_len)。

    def preprocess_add_request(self, request: EngineCoreRequest) -> tuple[Request, int]:
        if self.mm_receiver_cache is not None and request.mm_features:
            request.mm_features = self.mm_receiver_cache.get_and_update_features(
                request.mm_features
            )

        req = Request.from_engine_core_request(request, self.request_block_hasher)
        if req.use_structured_output:
            self.structured_output_manager.grammar_init(req)
        return req, request.current_wave

请求预处理 — 在输入处理线程中并行执行:

  1. 多模态特征缓存:从接收器缓存获取/更新多模态特征(线程安全:仅在此线程访问)
  2. 转换为内部 RequestRequest.from_engine_core_request 将 EngineCoreRequest 转为调度器使用的 Request 对象
  3. 结构化输出初始化:如果请求需要约束输出,初始化语法编译器(异步编译,调度器会检查编译状态)
3.2.12 EEP 方法(577-583行)
    def _eep_scale_up_before_kv_init(self):
        raise NotImplementedError

    def _eep_send_engine_core_notification(
        self,
        notification_type: EEPNotificationType,
        vllm_config: VllmConfig | None = None,
    ):
        raise NotImplementedError

基类占位方法:在 DPEngineCoreProc 中覆写。


3.3 EngineShutdownState 枚举(585-588行)

class EngineShutdownState(IntEnum):
    RUNNING = 0
    REQUESTED = 1
    SHUTTING_DOWN = 2

关闭状态机:

RUNNING ──(SIGTERM/SIGINT)──→ REQUESTED ──(处理中)──→ SHUTTING_DOWN ──(完成)──→ 退出
状态 含义
RUNNING=0 正常运行
REQUESTED=1 收到关闭信号,准备处理
SHUTTING_DOWN=2 正在排空/中止请求

3.4 EngineCoreProc 类 — 子进程引擎

3.4.1 __init__ 方法(591-700行)

伪代码:

EngineCoreProc.__init__(vllm_config, local_client, handshake_address, ...):
    1. 创建 input_queue, output_queue
    2. 设置 executor_fail_callback(→ input_queue.put EXECUTOR_FAILED)
    3. 设置 engine_index, identity
    4. [可选] 初始化 TensorIpcReceiver
    5. 执行 ZMQ 握手 → 获取地址
    6. 配置数据并行环境
    7. [EEP] 发送初始化就绪通知
    8. 调用 super().__init__()(EngineCore 初始化)
    9. 启动输入/输出 socket 线程
    10. 等待 DP coordinator 就绪
class EngineCoreProc(EngineCore):
    ENGINE_CORE_DEAD = b"ENGINE_CORE_DEAD"
    addresses: EngineZmqAddresses
  • ENGINE_CORE_DEAD:引擎死亡信号字节,输出线程检测到后关闭
  • addresses:ZMQ 地址配置类属性
    @instrument(span_name="EngineCoreProc init")
    def __init__(
        self,
        vllm_config: VllmConfig,
        local_client: bool,
        handshake_address: str,
        executor_class: type[Executor],
        log_stats: bool,
        client_handshake_address: str | None = None,
        tensor_queue: Queue | None = None,
        *,
        engine_index: int = 0,
    ):
参数 含义
local_client 前端是否在本机
handshake_address 主握手 ZMQ 地址
client_handshake_address 本地前端握手地址(DP>1 + hybrid LB 时)
tensor_queue 多进程张量共享队列
engine_index 引擎索引(DP 场景)
        self.input_queue = queue.Queue[tuple[EngineCoreRequestType, Any]]()
        self.output_queue = queue.Queue[tuple[int, EngineCoreOutputs] | bytes]()
        executor_fail_callback = lambda: self.input_queue.put_nowait(
            (EngineCoreRequestType.EXECUTOR_FAILED, b"")
        )

双队列架构

  • input_queue(request_type, request_data) 元组队列
  • output_queue(client_index, EngineCoreOutputs)ENGINE_CORE_DEAD 字节

executor 失败时通过 input_queue 通知主循环。

        self.engine_index = engine_index
        identity = self.engine_index.to_bytes(length=2, byteorder="little")
        self.engines_running = False
        self.shutdown_state = EngineShutdownState.RUNNING

ZMQ identity:2 字节的小端编码,用于 DEALER socket 标识。

        with self._perform_handshakes(
            handshake_address, identity, local_client, vllm_config,
            client_handshake_address,
        ) as addresses:
            self.has_coordinator = addresses.coordinator_output is not None
            self.frontend_stats_publish_address = (
                addresses.frontend_stats_publish_address
            )
            internal_dp_balancing = (
                self.has_coordinator
                and not vllm_config.parallel_config.data_parallel_external_lb
            )
            self.publish_dp_lb_stats = internal_dp_balancing
            self.addresses = addresses
            self.process_input_queue_block = True

握手结果处理

  • has_coordinator:是否有 DP 协调器
  • publish_dp_lb_stats:是否向协调器发布负载统计
  • process_input_queue_block:input_queue.get() 是否阻塞(EEP 时设为 False)
            if envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
                self._eep_send_engine_core_notification(
                    EEPNotificationType.NEW_CORE_ENGINES_INIT_READY,
                    vllm_config=vllm_config,
                )
            self._init_data_parallel(vllm_config)

EEP 通知 + DP 初始化:在 super().init() 之前完成。

            super().__init__(
                vllm_config,
                executor_class,
                log_stats,
                executor_fail_callback,
                internal_dp_balancing,
            )

调用父类初始化internal_dp_balancing 作为 include_finished_set 传入——内部 DP 负载均衡需要跟踪已完成的请求集合。

            ready_event = threading.Event()
            input_thread = threading.Thread(
                target=self.process_input_sockets,
                args=(addresses.inputs, addresses.coordinator_input, identity, ready_event),
                daemon=True,
            )
            input_thread.start()

            self.output_thread = threading.Thread(
                target=self.process_output_sockets,
                args=(addresses.outputs, addresses.coordinator_output, self.engine_index),
                daemon=True,
            )
            self.output_thread.start()

            while not ready_event.wait(timeout=10):
                if not input_thread.is_alive():
                    raise RuntimeError("Input socket thread died during startup")
                assert addresses.coordinator_input is not None
                logger.info("Waiting for READY message from DP Coordinator...")

I/O 线程启动

  1. 输入线程:ZMQ socket → input_queue
  2. 输出线程:output_queue → ZMQ socket
  3. 等待 ready_event:输入线程准备好后设置,包括 DP coordinator 的 READY 消息
3.4.2 握手机制(702-780行)
    @contextmanager
    def _perform_handshakes(
        self, handshake_address, identity, local_client, vllm_config,
        client_handshake_address,
    ) -> Generator[EngineZmqAddresses, None, None]:
        input_ctx = zmq.Context()
        is_local = local_client and client_handshake_address is None
        headless = not local_client
        handshake = self._perform_handshake(
            input_ctx, handshake_address, identity, is_local, headless,
            vllm_config, vllm_config.parallel_config,
        )
        if client_handshake_address is None:
            with handshake as addresses:
                yield addresses
        else:
            # 双握手:rank 0 + 本地前端
            local_handshake = self._perform_handshake(
                input_ctx, client_handshake_address, identity, True, False, vllm_config
            )
            with handshake as addresses, local_handshake as client_addresses:
                addresses.inputs = client_addresses.inputs
                addresses.outputs = client_addresses.outputs
                yield addresses
        vllm_config.__post_init__()

握手场景:

场景 握手次数 说明
DP=1 / 离线 1次 与共置前端握手
DP>1 + 内部 LB 1次 与共享前端握手
DP>1 + 外部/混合 LB 2次 rank0前端 + 本地前端

第二次握手将本地前端的输入/输出地址合并到主地址中。

    @contextmanager
    def _perform_handshake(
        self, ctx, handshake_address, identity, local_client, headless,
        vllm_config, parallel_config_to_update=None,
    ) -> Generator[EngineZmqAddresses, None, None]:
        with make_zmq_socket(
            ctx, handshake_address, zmq.DEALER, identity=identity,
            linger=5000, bind=False,
        ) as handshake_socket:
            addresses = self.startup_handshake(
                handshake_socket, local_client, headless, parallel_config_to_update
            )
            yield addresses
            ready_msg = {
                "status": "READY",
                "local": local_client,
                "headless": headless,
            }
            if vllm_config.parallel_config.data_parallel_size > 1:
                ready_msg["parallel_config_hash"] = (
                    vllm_config.parallel_config.compute_hash()
                )
            handshake_socket.send(msgspec.msgpack.encode(ready_msg))

握手流程:

Engine → Frontend: {"status": "HELLO", "local": ..., "headless": ...}
Frontend → Engine: EngineHandshakeMetadata (ZMQ addresses + parallel config)
... (引擎初始化) ...
Engine → Frontend: {"status": "READY", "local": ..., "headless": ..., "parallel_config_hash": ...}

DP>1 时包含 parallel_config_hash 用于配置一致性验证。

    @staticmethod
    def startup_handshake(
        handshake_socket, local_client, headless, parallel_config=None,
    ) -> EngineZmqAddresses:
        handshake_socket.send(
            msgspec.msgpack.encode({"status": "HELLO", "local": local_client, "headless": headless})
        )
        if not handshake_socket.poll(timeout=HANDSHAKE_TIMEOUT_MINS * 60_000):
            raise RuntimeError(...)
        init_bytes = handshake_socket.recv()
        init_message: EngineHandshakeMetadata = msgspec.msgpack.decode(
            init_bytes, type=EngineHandshakeMetadata
        )
        if parallel_config is not None:
            for key, value in init_message.parallel_config.items():
                setattr(parallel_config, key, value)
        return init_message.addresses

握手详情:HELLO → 等待5分钟 → 接收初始化消息 → 更新 parallel_config → 返回地址。

3.4.3 run_engine_core 静态方法(782-830行)
    @staticmethod
    def run_engine_core(*args, dp_rank: int = 0, local_dp_rank: int = 0, **kwargs):
        """Launch EngineCore busy loop in background process."""
        maybe_register_config_serialize_by_value()
        engine_core: EngineCoreProc | None = None
        signal_callback: SignalCallback | None = None
        try:
            vllm_config = kwargs["vllm_config"]
            parallel_config = vllm_config.parallel_config
            data_parallel = parallel_config.data_parallel_size > 1 or dp_rank > 0

入口方法 — 在子进程中被调用:

  1. maybe_register_config_serialize_by_value():确保 transformer config 在 spawn 后可序列化
  2. 根据 DP 配置选择进程标题和引擎类型
            if data_parallel and vllm_config.model_config.is_moe:
                parallel_config.data_parallel_rank = dp_rank
                engine_core = DPEngineCoreProc(*args, **kwargs)
            else:
                parallel_config.data_parallel_size = 1
                parallel_config.data_parallel_size_local = 1
                parallel_config.data_parallel_rank = 0
                engine_core = EngineCoreProc(*args, engine_index=dp_rank, **kwargs)

DP 引擎选择逻辑:

条件 引擎类型 说明
DP>1 + MoE DPEngineCoreProc 需要 DP all-reduce 同步
DP>1 + 非MoE EngineCoreProc(engine_index=dp_rank) 完全独立,当 DP=1 处理
DP=1 EngineCoreProc(engine_index=0) 单引擎

关键洞察:非 MoE 模型的 DP rank 完全独立运行(无需同步),所以将 data_parallel_size 重置为 1。

            def wakeup_engine():
                engine_core.input_queue.put_nowait((EngineCoreRequestType.WAKEUP, None))

            signal_callback = SignalCallback(wakeup_engine)

            def signal_handler(signum, frame):
                engine_core.shutdown_state = EngineShutdownState.REQUESTED
                signal_callback.trigger()

            signal.signal(signal.SIGTERM, signal_handler)
            signal.signal(signal.SIGINT, signal_handler)

            engine_core.run_busy_loop()

信号处理

  1. SIGTERM/SIGINT → 设置 REQUESTED 状态 + 唤醒空闲引擎
  2. SignalCallback 确保在主线程而非信号处理器中执行(安全)
  3. 进入 busy loop
        except Exception as e:
            if engine_core is None:
                logger.exception("EngineCore failed to start.")
            else:
                logger.exception("EngineCore encountered a fatal error.")
                engine_core._send_engine_dead()
            raise e
        finally:
            signal.signal(signal.SIGTERM, signal.SIG_DFL)
            signal.signal(signal.SIGINT, signal.SIG_DFL)
            if signal_callback is not None:
                signal_callback.stop()
            if engine_core is not None:
                engine_core.shutdown()

异常处理:启动失败 vs 运行时错误的区分。运行时错误发送 ENGINE_CORE_DEAD 信号。finally 块恢复信号处理 + 关闭引擎。

3.4.4 run_busy_loop / _process_input_queue / _process_engine_step(840-897行)
    def has_work(self) -> bool:
        return (
            self.engines_running
            or self.scheduler.has_requests()
            or bool(self.batch_queue)
        )

    def is_running(self) -> bool:
        return self.shutdown_state == EngineShutdownState.RUNNING

    def run_busy_loop(self):
        while self._handle_shutdown():
            self._process_input_queue()
            self._process_engine_step()
        raise SystemExit

忙循环主逻辑

while _handle_shutdown():
    _process_input_queue()    # 处理输入直到有工作
    _process_engine_step()    # 执行一步并输出
SystemExit
    def _process_input_queue(self):
        waited = False
        while not self.has_work() and self.is_running():
            self._notify_idle_state_callbacks()
            if self.input_queue.empty():
                with self.aborts_queue.mutex:
                    self.aborts_queue.queue.clear()
                if logger.isEnabledFor(DEBUG):
                    logger.debug("EngineCore waiting for work.")
                    waited = True
            block = self.process_input_queue_block
            try:
                req = self.input_queue.get(block=block)
                self._handle_client_request(*req)
            except queue.Empty:
                break
            if not block:
                break

        if waited:
            logger.debug("EngineCore loop active.")

        while not self.input_queue.empty():
            req = self.input_queue.get_nowait()
            self._handle_client_request(*req)

输入处理逻辑:

  1. 等待工作:循环检查 has_work(),空闲时通知 idle 回调
  2. 清空 abort 队列:空闲时清空(因为没有请求在运行,abort 无意义)
  3. 阻塞/非阻塞读取process_input_queue_block 控制是否阻塞等待
  4. 处理剩余请求:在退出等待循环后,一次性处理完队列中剩余的请求
    def _process_engine_step(self) -> bool:
        outputs, model_executed = self.step_fn()
        for output in outputs.items() if outputs else ():
            self.output_queue.put_nowait(output)
        self.post_step(model_executed)

        if not model_executed and self.scheduler.has_unfinished_requests():
            time.sleep(0.001)

        return model_executed

引擎步进

  1. 执行 step(调度+执行+更新)
  2. 将输出放入 output_queue(输出线程会转发到 ZMQ)
  3. 后步处理
  4. 关键优化:如果模型未执行但还有未完成请求(如等待远程 KV),sleep 1ms 让出 GIL,让后台线程(如 NIXL 握手)得以执行
3.4.5 _handle_shutdown / _handle_client_request(899-970行)
    def _handle_shutdown(self) -> bool:
        if self.shutdown_state == EngineShutdownState.RUNNING:
            return True
        if self.shutdown_state == EngineShutdownState.REQUESTED:
            shutdown_timeout = self.vllm_config.shutdown_timeout
            if shutdown_timeout == 0:
                aborted_reqs = self.scheduler.finish_requests(
                    None, RequestStatus.FINISHED_ABORTED
                )
                self._send_abort_outputs(aborted_reqs)
            else:
                logger.info("Draining %d in-flight requests (timeout=%ds)", ...)
            self.shutdown_state = EngineShutdownState.SHUTTING_DOWN
        if not self.has_work():
            logger.info("Shutdown complete")
            return False
        return True

关闭处理状态机:

RUNNING → return True (继续循环)
REQUESTED → timeout=0: 立即中止所有请求; timeout>0: 排空请求 → SHUTTING_DOWN
SHUTTING_DOWN → has_work()? True : False (退出)
    def _handle_client_request(self, request_type, request):
        if request_type == EngineCoreRequestType.WAKEUP:
            return
        elif request_type == EngineCoreRequestType.ADD:
            req, request_wave = request
            if self._reject_add_in_shutdown(req):
                return
            self.add_request(req, request_wave)
        elif request_type == EngineCoreRequestType.ABORT:
            self.abort_requests(request)
        elif request_type == EngineCoreRequestType.UTILITY:
            client_idx, call_id, method_name, args = request
            if self._reject_utility_in_shutdown(client_idx, call_id, method_name):
                return
            output = UtilityOutput(call_id)
            get_result = lambda: (
                (method := getattr(self, method_name))
                and method(*self._convert_msgspec_args(method, args))
            )
            enqueue_output = lambda out: self.output_queue.put_nowait(
                (client_idx, EngineCoreOutputs(utility_output=out))
            )
            self._invoke_utility_method(method_name, get_result, output, enqueue_output)
        elif request_type == EngineCoreRequestType.EXECUTOR_FAILED:
            raise RuntimeError("Executor failed.")

请求分发逻辑:

类型 处理
WAKEUP 忽略(仅用于唤醒空闲引擎)
ADD 预处理 → add_request
ABORT abort_requests
UTILITY 反射调用方法 → 封装结果
EXECUTOR_FAILED 抛出 RuntimeError

Utility RPC 机制:通过 getattr(self, method_name) 动态查找方法,支持任意引擎方法的远程调用。

3.4.6 _invoke_utility_method / _convert_msgspec_args(972-1003行)
    @staticmethod
    def _invoke_utility_method(name, get_result, output, enqueue_output):
        try:
            result = get_result()
            if isinstance(result, Future):
                callback = lambda future: EngineCoreProc._invoke_utility_method(
                    name, future.result, output, enqueue_output
                )
                result.add_done_callback(callback)
                return
            output.result = UtilityResult(result)
        except Exception as e:
            output.failure_message = f"Call to {name} method failed: {str(e)}"
        enqueue_output(output)

递归 Future 处理:如果 utility 方法返回 Future,递归等待结果。支持异步方法调用。

    @staticmethod
    def _convert_msgspec_args(method, args):
        if not args:
            return args
        arg_types = signature(method).parameters.values()
        assert len(args) <= len(arg_types)
        return tuple(
            msgspec.convert(v, type=p.annotation)
            if isclass(p.annotation)
            and issubclass(p.annotation, msgspec.Struct)
            and not isinstance(v, p.annotation)
            else v
            for v, p in zip(args, arg_types)
        )

自动 msgspec 转换:通过运行时类型检查,将普通 dict 自动转换为 msgspec.Struct 实例。这是必要的因为 msgpack 反序列化默认产生 dict,但方法签名期望 Struct。

3.4.7 _send_engine_dead(1005-1017行)
    def _send_engine_dead(self):
        self.output_queue.put_nowait(EngineCoreProc.ENGINE_CORE_DEAD)
        self.output_thread.join(timeout=5.0)
        if self.output_thread.is_alive():
            logger.fatal(
                "vLLM shutdown signal from EngineCore failed to send."
            )

死亡信号:发送 ENGINE_CORE_DEAD 并等待输出线程确认发送。5秒超时后如果线程仍活着,说明消息发送失败(可能是 ZMQ 连接断开)。

3.4.8 process_input_sockets — 输入线程(1019-1096行)

伪代码:

process_input_sockets(input_addresses, coord_input_address, identity, ready_event):
    1. 创建 add_request_decoder (带 Tensor IPC) 和 generic_decoder
    2. 连接所有 input DEALER socket + coordinator XSUB socket
    3. 向每个 input socket 发送 ready_payload (EngineCoreReadyResponse)
    4. [DP] 等待 coordinator READY 消息
    5. 设置 ready_event
    6. 循环:
        a. ZMQ poll 等待消息
        b. 接收多帧消息: (type_frame, *data_frames)
        c. 按 request_type 反序列化
        d. [ADD] preprocess_add_request → input_queue
        e. [ABORT] 同时放入 aborts_queue 和 input_queue
        f. [其他] 反序列化 → input_queue

关键细节:

        add_request_decoder = MsgpackDecoder(
            EngineCoreRequest, oob_tensor_provider=self.tensor_ipc_receiver
        )
        generic_decoder = MsgpackDecoder(oob_tensor_provider=self.tensor_ipc_receiver)

带外张量提供器oob_tensor_provider 允许通过单独的进程间队列传输张量数据(而非序列化进 msgpack),实现零拷贝张量共享。

        ready_response = EngineCoreReadyResponse(
            max_model_len=self.vllm_config.model_config.max_model_len,
            num_gpu_blocks=self.vllm_config.cache_config.num_gpu_blocks or 0,
            dp_stats_address=self.frontend_stats_publish_address,
        )
        ready_payload = msgspec.msgpack.encode(ready_response)
        for input_socket in input_sockets:
            input_socket.send(ready_payload)

首次消息:向每个 input socket 发送 EngineCoreReadyResponse。这是 ZMQ ROUTER-DEALER 模式的要求——DEALER 必须先发消息,ROUTER 才能向其发送。

                    if request_type == EngineCoreRequestType.ABORT:
                        self.aborts_queue.put_nowait(request)

Abort 双入队:abort 同时放入 aborts_queue(让核心循环在 step 间隙尽快处理)和 input_queue(确保排序,避免请求泄漏)。abort 在调度器中是幂等的,所以双入队安全。

3.4.9 process_output_sockets — 输出线程(1098-1159行)

伪代码:

process_output_sockets(output_paths, coord_output_path, engine_index):
    1. 创建 MsgpackEncoder + 复用缓冲区列表
    2. 连接 PUSH socket(s) + coordinator PUSH socket
    3. 循环:
        a. 从 output_queue 获取输出
        b. [ENGINE_CORE_DEAD] 发送到所有 socket 并退出
        c. 设置 engine_index
        d. [client_index == -1] 发送到 coordinator
        e. [其他] 编码 + 零拷贝发送到对应 client socket
        f. 缓冲区复用管理

零拷贝发送优化:

        reuse_buffers: list[bytearray] = []
        pending = deque[tuple[zmq.MessageTracker, Any, bytearray]]()
  • reuse_buffers:已发送完成的缓冲区,可复用
  • pending:正在被 ZMQ 传输的缓冲区引用,防止过早释放
                buffer = reuse_buffers.pop() if reuse_buffers else bytearray()
                buffers = encoder.encode_into(outputs, buffer)
                tracker = sockets[client_index].send_multipart(
                    buffers, copy=False, track=True
                )

encode_into 直接编码到预分配缓冲区,copy=False + track=True 实现零拷贝发送。MessageTracker 用于跟踪 ZMQ 何时完成发送,之后缓冲区可复用。

3.4.10 pause_scheduler(进程模式)(1161-1195行)
    def pause_scheduler(self, mode="abort", clear_cache=True) -> Future | None:
        def engine_idle_callback(engine, future):
            if clear_cache:
                engine._reset_caches()
            future.set_result(None)

        if mode == "abort":
            aborted_reqs = self.scheduler.finish_requests(
                None, RequestStatus.FINISHED_ABORTED
            )
            self._send_abort_outputs(aborted_reqs)

        pause_state = PauseState.PAUSED_ALL if mode == "keep" else PauseState.PAUSED_NEW
        self.scheduler.set_pause_state(pause_state)

        if self._pause_complete():
            if clear_cache:
                self._reset_caches()
            return None

        future = Future[Any]()
        self._idle_state_callbacks.append(partial(engine_idle_callback, future=future))
        return future

与 EngineCore.pause_scheduler 的区别

特性 EngineCore EngineCoreProc
“wait” 模式 ❌ 不支持 ✅ 支持
“abort” 时发送中止输出 _send_abort_outputs
异步 Future ❌ 总是返回 None ✅ 返回 Future(等空闲时完成)

“wait” 模式的实现:设置 PAUSED_NEW(新请求排队但继续 step),当引擎空闲时触发 idle 回调,清缓存并完成 Future。

3.4.11 输出发送辅助方法(1197-1230行)
    def _send_finish_outputs_to_client(
        self, req_ids, client_index, finish_reason
    ) -> None:
        outputs = [
            EngineCoreOutput(req_id, [], finish_reason=finish_reason)
            for req_id in req_ids
        ]
        eco = EngineCoreOutputs(finished_requests=req_ids, outputs=outputs)
        self.output_queue.put_nowait((client_index, eco))

    def _send_abort_outputs_to_client(self, req_ids, client_index):
        self._send_finish_outputs_to_client(req_ids, client_index, FinishReason.ABORT)

    def _send_error_outputs_to_client(self, req_ids, client_index):
        self._send_finish_outputs_to_client(req_ids, client_index, FinishReason.ERROR)

    def _send_abort_outputs(self, aborted_reqs):
        if aborted_reqs:
            by_client = defaultdict[int, set[str]](set)
            for req_id, client_index in aborted_reqs:
                by_client[client_index].add(req_id)
            for client_index, req_ids in by_client.items():
                self._send_abort_outputs_to_client(list(req_ids), client_index)

客户端路由输出

  • by_client:将请求按 client_index 分组
  • 每个客户端独立发送,确保输出路由到正确的前端
  • 同一请求可能属于不同客户端(多前端场景)

3.5 DPEngineCoreProc 类 — 数据并行引擎(MoE)

3.5.1 __init__ 方法(1233-1270行)
class DPEngineCoreProc(EngineCoreProc):
    def __init__(self, vllm_config, local_client, handshake_address, ...):
        assert vllm_config.model_config.is_moe, (
            "DPEngineCoreProc should only be used for MoE models"
        )
        self.step_counter = 0
        self.current_wave = 0
        self.last_counts = (0, 0)
        self.pending_pause = False
        self.ignore_start_dp_wave = False
        self.eep_scaling_state: ElasticEPScalingState | None = None

MoE 专用状态:

字段 含义
step_counter 前向传播计数器(每32步同步一次)
current_wave 当前 DP wave 编号
last_counts 上次发布的请求计数(避免重复发送)
pending_pause 是否有待处理的暂停
ignore_start_dp_wave 是否忽略 START_DP_WAVE 消息(暂停共识后)
eep_scaling_state 弹性 EP 扩缩容状态机
3.5.2 _init_data_parallel 方法(1272-1285行)
    def _init_data_parallel(self, vllm_config: VllmConfig):
        parallel_config = vllm_config.parallel_config
        dp_rank = parallel_config.data_parallel_rank
        dp_size = parallel_config.data_parallel_size
        local_dp_rank = parallel_config.data_parallel_rank_local

        assert dp_size > 1
        assert local_dp_rank is not None
        assert 0 <= local_dp_rank <= dp_rank < dp_size

        self.dp_rank = dp_rank
        self.dp_size = dp_size
        dp_group, dp_store = parallel_config.stateless_init_dp_group(return_store=True)
        self.dp_group, self.dp_store = dp_group, dp_store

DP 初始化:创建无状态进程组(stateless process group),用于 all-reduce 同步。每个 MoE DP rank 独立处理不同的请求子集,但需要定期同步"是否还有未完成请求"。

3.5.3 add_request / resume_scheduler — DP wave 管理(1291-1330行)
    def add_request(self, request: Request, request_wave: int = 0):
        super().add_request(request, request_wave)
        if self.has_coordinator and request_wave != self.current_wave:
            if request_wave > self.current_wave:
                self.current_wave = request_wave
            elif (
                not self.engines_running
                and self.scheduler.pause_state == PauseState.UNPAUSED
            ):
                self.engines_running = True
                self.output_queue.put_nowait(
                    (-1, EngineCoreOutputs(start_wave=self.current_wave))
                )

DP Wave 追踪

  • request_wave > current_wave:新 wave,更新计数器
  • request_wave == current_wave - 1(旧 wave)且引擎空闲:通知前端需要启动新 wave
    def resume_scheduler(self):
        if self.pending_pause or (self.engines_running and self.ignore_start_dp_wave):
            raise RuntimeError(...)
        if self.engines_running:
            return
        super().resume_scheduler()
        self.ignore_start_dp_wave = False
        has_global_unfinished = ParallelConfig.has_unfinished_dp(
            self.dp_group, self.scheduler.has_unfinished_requests()
        )
        if has_global_unfinished:
            self.engines_running = True

恢复调度:需要与所有 DP rank 同步——通过 all-reduce 检查是否有任何 rank 还有未完成请求。

3.5.4 run_busy_loop — DP 核心循环(1342-1390行)
    def run_busy_loop(self):
        while self._handle_shutdown():
            self._process_input_queue()

            if self.eep_scaling_state is not None:
                _ = self.eep_scaling_state.progress()
                if self.eep_scaling_state.is_complete():
                    if self.eep_scaling_state.worker_type == "removing":
                        raise SystemExit
                    self.process_input_queue_block = True
                    self.eep_scaling_state = None

            executed = self._process_engine_step()
            self._maybe_publish_request_counts()

            local_unfinished_reqs = self.scheduler.has_unfinished_requests()
            if not executed:
                if not local_unfinished_reqs and not self.engines_running:
                    continue
                self.execute_dummy_batch()

            self.engines_running = self._has_global_unfinished_reqs(
                local_unfinished_reqs
            )

            if not self.engines_running:
                client_index = -1 if self.has_coordinator else 0
                self.output_queue.put_nowait(
                    (client_index, EngineCoreOutputs(wave_complete=self.current_wave))
                )
                self.current_wave += 1
                self.step_counter = 0

        raise SystemExit

DP 忙循环与单引擎的关键区别:

特性 EngineCoreProc DPEngineCoreProc
空闲时 等待 input_queue 执行 dummy batch 保持 DP 同步
请求完成 自动停止 需要 all-reduce 共识
Wave 管理 有(wave_complete / start_wave)
EEP 处理 每次循环检查 scaling 状态

dummy batch 的必要性:当某些 DP rank 有请求而其他没有时,空闲 rank 需要执行空批次以参与 all-reduce,否则其他 rank 会永远阻塞。

Yes

No

No + no reqs + not running

No + running

Yes

Yes

No

run_busy_loop DP

_process_input_queue

EEP scaling?

progress + check complete

_process_engine_step

_maybe_publish_request_counts

executed?

continue

execute_dummy_batch

_has_global_unfinished_reqs

engines_running?

send wave_complete

current_wave++ step_counter=0

3.5.5 _has_global_unfinished_reqs — DP 共识算法(1392-1410行)
    def _has_global_unfinished_reqs(self, local_unfinished: bool) -> bool:
        self.step_counter += 1
        if self.step_counter % 32 != 0:
            return True

        has_unfinished, pause_consensus = ParallelConfig.sync_dp_state(
            self.dp_group,
            has_unfinished=local_unfinished,
            pending_pause=self.pending_pause,
        )

        if pause_consensus:
            self.ignore_start_dp_wave = True
            self.pending_pause = False

        return has_unfinished

优化策略:每32步才做一次 all-reduce。大部分时候假设还有未完成请求(返回 True),避免频繁同步开销。

pause_consensus:所有 DP rank 都同意暂停时,设置 ignore_start_dp_wave 防止 stale START_DP_WAVE 重新唤醒引擎。

3.5.6 reinitialize_distributed — 弹性重配置(1412-1465行)
    def reinitialize_distributed(
        self, reconfig_request: ReconfigureDistributedRequest
    ) -> None:
        new_parallel_config = deepcopy(self.vllm_config.parallel_config)
        # 更新 DP 配置...
        is_scale_down = reconfig_request.new_data_parallel_size < old_dp_size
        is_shutdown = (
            reconfig_request.new_data_parallel_rank
            == ReconfigureRankType.SHUTDOWN_CURRENT_RANK
        )
        self.eep_scaling_state = ElasticEPScalingState(
            model_executor=self.model_executor,
            engine_core=self,
            vllm_config=self.vllm_config,
            new_parallel_config=new_parallel_config,
            worker_type="removing" if is_shutdown else "existing",
            scale_type="scale_down" if is_scale_down else "scale_up",
            reconfig_request=reconfig_request,
        )
        self.process_input_queue_block = False

弹性扩缩容

  1. 深拷贝 parallel_config,更新 DP 参数
  2. 创建 ElasticEPScalingState 状态机
  3. 设置 process_input_queue_block = False:非阻塞处理输入,让状态机可以推进
  4. 状态机在 run_busy_loop 的每次循环中 progress()
3.5.7 _eep_send_engine_core_notification(1467-1500行)
    def _eep_send_engine_core_notification(
        self, notification_type, vllm_config=None,
    ):
        notification_data = (notification_type.value, dp_rank)
        outputs = EngineCoreOutputs(
            utility_output=UtilityOutput(
                call_id=EEP_NOTIFICATION_CALL_ID,
                result=UtilityResult(notification_data),
            )
        )
        if hasattr(self, "output_thread") and self.output_thread.is_alive():
            self.output_queue.put_nowait((0, outputs))
        else:
            # 输出线程还没启动,直接通过 ZMQ 发送
            encoder = MsgpackEncoder()
            with (
                zmq.Context() as ctx,
                make_zmq_socket(ctx, self.addresses.outputs[0], zmq.PUSH, linger=4000) as socket,
            ):
                socket.send_multipart(encoder.encode(outputs))

双路径发送

  • 输出线程已启动:通过 output_queue 间接发送(线程安全)
  • 输出线程未启动:直接创建临时 ZMQ socket 发送(仅在初始化期间使用)

3.6 EngineCoreActorMixin — Ray Actor 基类

3.6.1 __init__ 方法(1508-1540行)
class EngineCoreActorMixin:
    def __init__(self, vllm_config, addresses, dp_rank=0, local_dp_rank=0):
        maybe_init_worker_tracer(...)
        self.addresses = addresses
        vllm_config.parallel_config.data_parallel_index = dp_rank
        vllm_config.parallel_config.data_parallel_rank_local = local_dp_rank
        self._set_visible_devices(vllm_config, local_dp_rank)

Ray Actor 的特殊处理

  1. 不需要 ZMQ 握手(地址在创建前已知)
  2. 需要手动设置 CUDA_VISIBLE_DEVICES(Ray 的 GPU 管理不同于直接 spawn)
    def _set_cuda_visible_devices(self, vllm_config, local_dp_rank, device_control_env_var):
        world_size = vllm_config.parallel_config.world_size
        try:
            value = get_device_indices(device_control_env_var, local_dp_rank, world_size)
            os.environ[device_control_env_var] = value
        except IndexError as e:
            raise Exception(...)

设备可见性设置:根据 local_dp_rank 和 world_size 计算应该看到哪些 GPU,设置环境变量。

    @contextmanager
    def _perform_handshakes(self, handshake_address, identity, local_client, ...):
        yield self.addresses

简化握手:Ray Actor 模式下不需要 ZMQ 握手——地址在 actor 创建前就已知,直接 yield 已有地址。

    def wait_for_init(self):
        pass

    def run(self):
        try:
            self.run_busy_loop()
        except SystemExit:
            logger.debug("EngineCore exiting.")
            raise
        except Exception:
            logger.exception("EngineCore encountered a fatal error.")
            raise
        finally:
            self.shutdown()

Ray Actor 生命周期

  • wait_for_init():空方法,ray.get() 调用保证 __init__ 完成
  • run():启动忙循环,异常处理与 run_engine_core 类似

3.7 DPMoEEngineCoreActor — MoE DP Ray Actor

class DPMoEEngineCoreActor(EngineCoreActorMixin, DPEngineCoreProc):
    def __init__(self, vllm_config, local_client, addresses, executor_class, log_stats, dp_rank=0, local_dp_rank=0):
        vllm_config.parallel_config.data_parallel_rank = dp_rank
        EngineCoreActorMixin.__init__(self, vllm_config, addresses, dp_rank, local_dp_rank)
        DPEngineCoreProc.__init__(self, vllm_config, local_client, "", executor_class, log_stats)

MRO(方法解析顺序)DPMoEEngineCoreActor → EngineCoreActorMixin → DPEngineCoreProc → EngineCoreProc → EngineCore

初始化顺序:

  1. 设置 DP rank
  2. EngineCoreActorMixin.__init__:设备可见性 + 追踪器
  3. DPEngineCoreProc.__init__:DP 引擎核心(空字符串作为握手地址,因为不需要)

3.8 EngineCoreActor — 非 MoE Ray Actor

class EngineCoreActor(EngineCoreActorMixin, EngineCoreProc):
    def __init__(self, vllm_config, local_client, addresses, executor_class, log_stats, dp_rank=0, local_dp_rank=0):
        vllm_config.parallel_config.data_parallel_size = 1
        vllm_config.parallel_config.data_parallel_size_local = 1
        vllm_config.parallel_config.data_parallel_rank = 0
        EngineCoreActorMixin.__init__(self, vllm_config, addresses, dp_rank, local_dp_rank)
        EngineCoreProc.__init__(self, vllm_config, local_client, "", executor_class, log_stats, engine_index=dp_rank)

非 MoE DP 场景:与 run_engine_core 中的逻辑一致——重置 DP size 为 1,每个 rank 独立运行。


四、Mermaid图表汇总

4.1 类继承关系图

«核心引擎»

EngineCore

+vllm_config: VllmConfig

+model_executor: Executor

+scheduler: SchedulerInterface

+structured_output_manager: StructuredOutputManager

+batch_queue: deque | None

+step_fn: Callable

+aborts_queue: Queue

+init(vllm_config, executor_class, ...)

+_initialize_kv_caches(vllm_config) : KVCacheConfig

+add_request(request, request_wave)

+abort_requests(request_ids)

+step() : tuple

+step_with_batch_queue() : tuple

+post_step(model_executed)

+pause_scheduler(mode, clear_cache) : Future

+resume_scheduler()

+sleep(level, mode) : Future

+wake_up(tags)

+shutdown()

+preprocess_add_request(request) : tuple

«子进程引擎»

EngineCoreProc

+input_queue: Queue

+output_queue: Queue

+engines_running: bool

+shutdown_state: EngineShutdownState

+addresses: EngineZmqAddresses

+ENGINE_CORE_DEAD: bytes

+run_engine_core() : static

+run_busy_loop()

+has_work() : bool

+_process_input_queue()

+_process_engine_step() : bool

+_handle_shutdown() : bool

+_handle_client_request(type, request)

+process_input_sockets(...)

+process_output_sockets(...)

+pause_scheduler(mode, clear_cache) : Future

«MoE DP引擎»

DPEngineCoreProc

+step_counter: int

+current_wave: int

+pending_pause: bool

+ignore_start_dp_wave: bool

+dp_rank: int

+dp_size: int

+dp_group: ProcessGroup

+eep_scaling_state: ElasticEPScalingState

+run_busy_loop()

+_has_global_unfinished_reqs() : bool

+_init_data_parallel(vllm_config)

+add_request(request, request_wave)

+resume_scheduler()

+reinitialize_distributed(reconfig_request)

«Ray Actor Mixin»

EngineCoreActorMixin

+addresses: EngineZmqAddresses

+_set_visible_devices(vllm_config, local_dp_rank)

+_perform_handshakes(...) : Generator

+wait_for_init()

+run()

«Ray MoE DP Actor»

DPMoEEngineCoreActor

«Ray Non-MoE Actor»

EngineCoreActor

4.2 引擎核心主循环流程

RUNNING

REQUESTED

Yes

No

Yes

No

run_busy_loop 入口

_handle_shutdown?

_process_input_queue

timeout=0?

中止所有请求

排空在途请求

SHUTTING_DOWN

has_work?

SystemExit

_process_engine_step

回到 Shutdown

4.3 Step 主流程(无批次队列)

渲染错误: Mermaid 渲染失败: Parse error on line 3: ... B -->|No| C[return {}, False] B --> -----------------------^ Expecting 'SQE', 'DOUBLECIRCLEEND', 'PE', '-)', 'STADIUMEND', 'SUBROUTINEEND', 'PIPE', 'CYLINDEREND', 'DIAMOND_STOP', 'TAGEND', 'TRAPEND', 'INVTRAPEND', 'UNICODE_TEXT', 'TEXT', 'TAGSTART', got 'DIAMOND_START'

4.4 Step with Batch Queue 流程

Yes

No

Yes

Yes

No

No + 队列空

No + 队列非空

Yes

No

step_with_batch_queue

scheduler.has_requests?

schedule + execute_model async

deferred sampling?

batch_queue.appendleft

标记 deferred

队列未满 + 最老未完成?

return None, True

等待最老批次

return None, False

batch_queue.pop + future.result

process_aborts + update_from_output

有 deferred?

draft_tokens + grammar + sample → 入队

return outputs

4.5 ZMQ 通信架构

渲染错误: Mermaid 渲染失败: Parse error on line 8: ...输入线程\nDEALER Socket]\n→ input_queue -----------------------^ Expecting 'SEMI', 'NEWLINE', 'SPACE', 'EOF', 'SHAPE_DATA', 'STYLE_SEPARATOR', 'START_LINK', 'LINK', 'LINK_ID', got 'NODE_STRING'

4.6 数据并行(MoE)忙循环

渲染错误: Mermaid 渲染失败: Parse error on line 17: ...[send wave_complete]\ncurrent_wave++\nst... -----------------------^ Expecting 'SEMI', 'NEWLINE', 'SPACE', 'EOF', 'SHAPE_DATA', 'STYLE_SEPARATOR', 'START_LINK', 'LINK', 'LINK_ID', got 'NODE_STRING'

4.7 握手协议时序

DP Coordinator (optional) Frontend (ROUTER) EngineCoreProc DP Coordinator (optional) Frontend (ROUTER) EngineCoreProc 初始化引擎(模型加载、KV cache、调度器) 向每个 input socket 发送 EngineCoreReadyResponse 设置 ready_event,开始忙循环 HELLO {local, headless} EngineHandshakeMetadata {addresses, parallel_config} READY {local, headless, parallel_config_hash} READY (如果 DP)

4.8 Sleep/Wake 生命周期

sleep(level=0)

sleep(level=1)

sleep(level=2)

wake_up(tags=[scheduling])

wake_up()

wake_up()

RUNNING

调度暂停\nGPU不变

权重卸载到CPU\nKV cache丢弃

所有GPU内存释放

pause mode=abort:\n中止所有请求
pause mode=wait:\n等待排空
pause mode=keep:\n冻结队列

4.9 请求生命周期

渲染错误: Mermaid 渲染失败: Parse error on line 7: ...process_add_request]\nMM cache + Request -----------------------^ Expecting 'SEMI', 'NEWLINE', 'SPACE', 'EOF', 'SHAPE_DATA', 'STYLE_SEPARATOR', 'START_LINK', 'LINK', 'LINK_ID', got 'NODE_STRING'

4.10 Utility RPC 机制

渲染错误: Mermaid 渲染失败: Parse error on line 2: ... A[前端发送 UTILITY 请求]\n(client_idx, call_ -----------------------^ Expecting 'SEMI', 'NEWLINE', 'SPACE', 'EOF', 'SHAPE_DATA', 'STYLE_SEPARATOR', 'START_LINK', 'LINK', 'LINK_ID', got 'NODE_STRING'

五、设计模式与架构洞察

5.1 生产者-消费者模式

引擎核心采用三队列架构:

  1. input_queue:输入线程 → 忙循环
  2. output_queue:忙循环 → 输出线程
  3. aborts_queue:输入线程 → 忙循环(快速通道)

这种设计解耦了 ZMQ I/O 和 GPU 计算,充分利用 ZMQ 释放 GIL 的特性,实现 I/O 与计算的重叠。

5.2 批次流水线模式

Pipeline Parallelism 的 batch_queue 实现了异步双缓冲

  • 调度器产生新批次入队(左端)
  • 从队列右端取最老批次等待结果
  • 优先填满队列而非获取结果

5.3 DP Wave 协调模式

MoE 数据并行采用 Wave 协调协议

  • 请求按 wave 分组
  • 所有 DP rank 的同一 wave 完成后才开始下一 wave
  • 通过 all-reduce(每32步一次)达成共识
  • 空闲 rank 执行 dummy batch 保持同步

5.4 线程安全设计

关键线程安全保证:

  • mm_receiver_cache:仅在输入线程访问(初始化后只被 preprocess_add_request 使用)
  • structured_output_manager.grammar_init:仅在输入线程调用
  • scheduler:仅在忙循环线程访问(串行化保证)
  • aborts_queue:Python queue.Queue 本身线程安全
  • input_queue / output_queue:Python queue.Queue 线程安全

5.5 弹性扩缩容状态机

Elastic EP 通过 ElasticEPScalingState 实现:

  • 新引擎worker_type="new",在 KV cache 初始化前开始状态机
  • 已有引擎worker_type="existing",在收到重配置请求后开始
  • 退出引擎worker_type="removing",完成后抛出 SystemExit
  • process_input_queue_block = False:允许状态机推进时非阻塞处理输入

六、关键数据流总结

6.1 请求处理全路径

API Request
  → InputProcessor.assign_request_id()
  → EngineCoreRequest (msgspec.Struct)
  → ZMQ DEALER (msgpack encoded, type=b"\x00")
  → Input Socket Thread (decode + preprocess)
  → input_queue (ADD, (Request, wave))
  → Busy Loop (_handle_client_request)
  → scheduler.add_request()
  → scheduler.schedule() → SchedulerOutput
  → executor.execute_model() → Future<ModelRunnerOutput>
  → scheduler.update_from_output() → EngineCoreOutputs
  → output_queue (client_index, EngineCoreOutputs)
  → Output Socket Thread (encode + zero-copy ZMQ send)
  → Frontend PULL socket
  → EngineCoreClient
  → RequestOutput

6.2 初始化全路径

EngineCoreProc.__init__()
  → _perform_handshakes() → ZMQ addresses
  → EngineCore.__init__()
    → load_general_plugins()
    → executor_class(vllm_config) → 模型加载
    → _initialize_kv_caches()
      → executor.get_kv_cache_specs()
      → executor.determine_available_memory() (profile)
      → get_kv_cache_configs() → auto-fit max_model_len
      → executor.initialize_from_config() (allocate + warmup)
    → StructuredOutputManager()
    → Scheduler(vllm_config, kv_cache_config, ...)
    → freeze_gc_heap() + enable_envs_cache()
  → Start I/O threads
  → Wait for DP Coordinator READY
Logo

免费领 100 小时云算力,进群参与显卡、AI PC 幸运抽奖

更多推荐