vLLM V1 Worker 超深度架构分析 — Part 2: 输入批处理与采样系统

分析目标目录: github.com/vllm/vllm/v1/worker
本卷聚焦: InputBatch 批次管理 | 采样管线 | Logprobs | 微批次调度


目录

  1. 输入批处理系统总览
  2. CachedRequestState 请求缓存状态
  3. InputBatch (顶层) 深度分析
  4. GPU InputBatch 深度分析
  5. TPU InputBatch 深度分析
  6. 微批次调度系统
  7. UBatchWrapper 微批次包装器
  8. 采样管线总览
  9. Sampler 采样器核心
  10. SamplingStates 采样状态
  11. Penalties 惩罚系统
  12. Logprob 对数概率计算
  13. PromptLogprob 提示对数概率
  14. LogitBias 偏置应用
  15. MinP / Gumbel 采样
  16. BadWords 坏词过滤
  17. StructuredOutputs 结构化输出

一、输入批处理系统总览

1.1 系统定位

输入批处理系统是 Worker 推理管线中调度器输出与模型执行之间的桥梁。它负责:

职责 说明
请求状态管理 缓存每个请求的采样参数、token 历史、LoRA 配置等
输入张量组装 将多个请求的 token 拼装成连续的 input_ids/positions 张量
KV Cache 映射 维护 slot_mapping 和 block_table 映射关系
注意力元数据 构建 seq_lens、query_start_loc 等注意力计算所需的元数据
微批次拆分 将大批次拆分为多个微批次,优化 GPU 利用率
采样参数准备 为采样器准备 temperature、top_k、top_p 等参数张量

1.2 批处理架构图

下游消费者

输入批处理系统

调度器输出

微批次

SchedulerOutput

NewRequestData
新请求数据

CachedRequestData
缓存请求数据

SwapIn/Out
KV Cache换页

CopyOnWrite
KV Cache复制

CachedRequestState
请求缓存状态

InputBatch
顶层批次管理

gpu/InputBatch
GPU批次张量

BlockTable
块表映射

ubatch_utils
微批次工具

ubatching
微批次调度

UBatchWrapper
微批次包装器

GPUModelRunner

Sampler

Attention Backend

1.3 数据流图

输出到模型

InputBatch 内部处理

填充input_ids

填充positions

更新block_table

更新seq_lens

构建slot_mapping

准备采样参数

调度器输出 -> InputBatch

SchedulerOutput

新增请求

恢复请求

更新token数

完成请求

CachedRequestState

input_ids
[num_tokens]

positions
[num_tokens]

block_table
[num_reqs, max_blocks]

seq_lens
[num_reqs]

slot_mapping
[num_tokens]

sampling_params
[num_reqs]

model.forward()

Paged Attention

Sampler


二、CachedRequestState 请求缓存状态

2.1 数据类定义

文件: gpu_input_batch.py (前部分)

@dataclass
class CachedRequestState:
    """缓存请求状态 - 存储单个请求的所有运行时状态。
    
    每个活跃请求都有一个对应的 CachedRequestState 实例,
    在请求的生命周期内持续更新。
    
    设计目的:
    - 避免每步都从调度器重新构建请求状态
    - 缓存不变的状态(如 sampling_params),只更新变化的部分
    - 为输入张量组装提供快速访问
    """
    
    # === 基础请求信息 ===
    request_id: str              # 请求唯一标识符
    prompt_token_ids: tuple[int, ...]  # 提示 token IDs(不可变)
    prompt_embeds: Optional[torch.Tensor] = None  # 提示嵌入(可选,替代 token IDs)
    
    # === 采样参数 ===
    sampling_params: SamplingParams  # 采样参数(temperature, top_k, top_p 等)
    pooling_params: Optional[PoolingParams] = None  # 池化参数(pooling 模型用)
    
    # === 运行时状态 ===
    num_computed_tokens: int = 0     # 已计算的 token 数(包括 prompt + generated)
    block_ids: tuple[int, ...] = ()  # 分配的 KV Cache 物理块 ID 列表
    num_new_tokens: int = 0         # 本步新调度的 token 数
    
    # === LoRA 状态 ===
    lora_request: Optional[LoRARequest] = None  # LoRA 适配器请求
    
    # === 多模态状态 ===
    mm_feature_spec: Optional[MultiModalFeatureSpec] = None  # 多模态特征规格
    
    # === 输出状态 ===
    output_token_ids: list[int] = field(default_factory=list)  # 已生成的输出 token IDs
    
    # === 推理模式 ===
    is_floating_req: bool = False  # 是否为"浮动"请求(预填充未完成)

2.2 CachedRequestState 生命周期

请求到达

首次调度

继续预填充

预填充完成

生成新token

生成EOS/达到max_tokens

New

Prefilling

Decoding

Completed

CachedRequestState创建
prompt_token_ids确定

num_computed_tokens增长
block_ids可能扩展

num_computed_tokens+1/步
output_token_ids增长

2.3 CachedRequestState 与 InputBatch 的关系

批次级组装 (InputBatch)

请求级缓存

新token位置10

新token位置3,4,5

block_ids

block_ids

seq_len

seq_len

CachedRequestState
request_id=req_001
prompt=[1,2,3,4,5]
num_computed=10
block_ids=(0,1)

CachedRequestState
request_id=req_002
prompt=[6,7,8]
num_computed=3
block_ids=(2,)

input_ids
[5,6,7,8,...]
拼接所有新token

positions
[10,3,4,5,...]
各请求的新位置

block_table
[[0,1,-1],
[2,-1,-1]]

seq_lens
[11,4]
各请求当前长度


三、InputBatch (顶层) 深度分析

3.1 概述

文件: gpu_input_batch.py (1121行)

InputBatch 是顶层批次管理器,负责:

  1. 管理请求的增删改(add/remove/update)
  2. 维护 GPU 输入缓冲区(input_ids, positions, seq_lens 等)
  3. 管理块表映射(BlockTable)
  4. 构建注意力元数据所需的辅助信息
  5. 为采样器准备采样参数

3.2 类定义

class InputBatch:
    """输入批次 - 管理推理步骤的输入数据。
    
    核心数据结构:
    - requests: 请求数组 [max_num_reqs],存储 CachedRequestState
    - request_ids_to_indices: 请求 ID 到批次索引的映射
    - block_table: BlockTable 块表实例
    
    GPU 缓冲区:
    - input_ids: [max_num_tokens] 输入 token IDs
    - positions: [max_num_tokens] 位置编码
    - seq_lens: [max_num_reqs] 序列长度
    - query_start_loc: [max_num_reqs + 1] 查询起始位置
    
    设计理念:
    - 预分配固定大小的 GPU 缓冲区,避免动态分配
    - 使用 numpy 在 CPU 上构建,然后拷贝到 GPU
    - 支持 CUDA Graph(固定大小张量)
    """
    
    def __init__(
        self,
        vllm_config: VllmConfig,
        device: torch.device,
    ):
        # 配置参数
        self.vllm_config = vllm_config
        self.model_config = vllm_config.model_config
        self.cache_config = vllm_config.cache_config
        self.scheduler_config = vllm_config.scheduler_config
        self.parallel_config = vllm_config.parallel_config
        self.device = device
        
        # 批次容量
        self.max_num_reqs = scheduler_config.max_num_seqs
        self.max_num_tokens = scheduler_config.max_num_batched_tokens
        self.max_model_len = model_config.max_model_len
        
        # 块大小
        self.block_size = cache_config.block_size
        self.max_num_blocks_per_req = cdiv(self.max_model_len, self.block_size)
        
        # === 请求数据 ===
        # 请求数组 - 预分配,运行时按需使用
        self.requests: list[Optional[CachedRequestState]] = [None] * self.max_num_reqs
        # 请求 ID -> 批次索引映射
        self.request_ids_to_indices: dict[str, int] = {}
        # 当前批次中的请求数
        self.num_reqs = 0
        
        # === GPU 输入缓冲区 ===
        # 输入 token IDs - 包含所有请求的新 token
        self.input_ids = torch.zeros(
            self.max_num_tokens, dtype=torch.int32, device=device
        )
        # 位置编码 - 每个 token 在序列中的位置
        self.positions = torch.zeros(
            self.max_num_tokens, dtype=torch.int64, device=device
        )
        # 序列长度 - 每个请求的当前序列长度(含已生成的 token)
        self.seq_lens = torch.zeros(
            self.max_num_reqs, dtype=torch.int32, device=device
        )
        # 查询起始位置 - 用于标记每个请求的 token 在 input_ids 中的范围
        # query_start_loc[i] = 第 i 个请求的 token 起始位置
        # query_start_loc[i+1] - query_start_loc[i] = 第 i 个请求的 token 数
        self.query_start_loc = torch.zeros(
            self.max_num_reqs + 1, dtype=torch.int32, device=device
        )
        
        # === 块表 ===
        self.block_table = BlockTable(
            max_num_reqs=self.max_num_reqs,
            max_num_blocks_per_req=self.max_num_blocks_per_req,
            block_size=self.block_size,
            pin_memory=True,
        )

3.3 add_request() - 添加新请求

    def add_request(
        self,
        request_id: str,             # 请求 ID
        prompt_token_ids: tuple[int, ...],  # 提示 token IDs
        sampling_params: SamplingParams,     # 采样参数
        block_ids: tuple[int, ...],          # 分配的物理块 ID
        num_computed_tokens: int,            # 已计算的 token 数
        lora_request: Optional[LoRARequest] = None,  # LoRA 请求
        pooling_params: Optional[PoolingParams] = None,  # 池化参数
        mm_feature_spec: Optional[MultiModalFeatureSpec] = None,  # 多模态规格
    ) -> None:
        """将新请求添加到批次中。
        
        执行步骤:
        1. 找到空闲的批次索引
        2. 创建 CachedRequestState
        3. 更新块表
        4. 注册请求 ID -> 索引映射
        
        注意: 此方法只更新状态,不更新 GPU 缓冲区
        GPU 缓冲区在 _update_inputs() 中统一更新
        """
        # 步骤1: 分配批次索引
        # 如果有已完成的请求留下的空位,优先使用
        req_index = self.num_reqs  # 简化: 使用下一个空位
        
        # 步骤2: 创建缓存状态
        self.requests[req_index] = CachedRequestState(
            request_id=request_id,
            prompt_token_ids=prompt_token_ids,
            sampling_params=sampling_params,
            block_ids=block_ids,
            num_computed_tokens=num_computed_tokens,
            lora_request=lora_request,
            pooling_params=pooling_params,
            mm_feature_spec=mm_feature_spec,
        )
        
        # 步骤3: 更新块表
        self.block_table.add_row(req_index, list(block_ids))
        
        # 步骤4: 注册映射
        self.request_ids_to_indices[request_id] = req_index
        self.num_reqs += 1

3.4 remove_request() - 移除完成的请求

    def remove_request(self, request_id: str) -> None:
        """移除已完成的请求。
        
        执行步骤:
        1. 查找请求索引
        2. 清除块表行
        3. 清除缓存状态
        4. 更新映射
        5. 压缩批次(可选)
        """
        req_index = self.request_ids_to_indices.pop(request_id)
        self.requests[req_index] = None
        self.block_table.remove_row(req_index)
        self.num_reqs -= 1
        
        # 如果不是最后一个请求,需要压缩
        # 将最后一个请求移动到空位
        if req_index < self.num_reqs:
            last_index = self.num_reqs
            self.requests[req_index] = self.requests[last_index]
            self.block_table.move_row(last_index, req_index)
            # 更新映射
            moved_req = self.requests[req_index]
            self.request_ids_to_indices[moved_req.request_id] = req_index
            self.requests[last_index] = None

3.5 update() - 更新批次状态

    def update(
        self,
        scheduler_output: SchedulerOutput,
    ) -> None:
        """根据调度器输出更新批次状态。
        
        这是每个推理步骤的核心更新方法。
        
        执行步骤:
        1. 处理新请求 (add_request)
        2. 处理恢复请求 (resume_request)
        3. 处理 KV Cache swap/copy 操作
        4. 处理完成的请求 (remove_request)
        5. 更新已计算 token 数
        6. 更新 GPU 缓冲区
        """
        # 步骤1: 处理新请求
        for new_req_data in scheduler_output.scheduled_new_reqs:
            self.add_request(
                request_id=new_req_data.request_id,
                prompt_token_ids=new_req_data.prompt_token_ids,
                sampling_params=new_req_data.sampling_params,
                block_ids=new_req_data.block_ids,
                num_computed_tokens=new_req_data.num_computed_tokens,
                ...
            )
        
        # 步骤2: 处理恢复请求(从 preemption 恢复)
        for resumed_req in scheduler_output.scheduled_cached_reqs:
            req_index = self.request_ids_to_indices[resumed_req.request_id]
            # 更新块表和计算 token 数
            self._update_resumed_request(req_index, resumed_req)
        
        # 步骤3: 处理 KV Cache 操作
        # swap in: CPU -> GPU
        for swap_in_op in scheduler_output.blocks_to_swap_in:
            ...
        # swap out: GPU -> CPU
        for swap_out_op in scheduler_output.blocks_to_swap_out:
            ...
        # copy-on-write: 共享块需要时复制
        for copy_op in scheduler_output.blocks_to_copy:
            ...
        
        # 步骤4: 移除完成的请求
        for finished_req_id in scheduler_output.finished_req_ids:
            self.remove_request(finished_req_id)
        
        # 步骤5: 更新 GPU 输入缓冲区
        self._update_inputs()

3.6 _update_inputs() - 更新 GPU 输入张量

    def _update_inputs(self) -> None:
        """更新所有 GPU 输入缓冲区。
        
        将 CPU 上的请求状态数据转换为 GPU 张量。
        
        执行步骤:
        1. 计算每个请求的新 token 及其位置
        2. 填充 input_ids 和 positions
        3. 更新 seq_lens 和 query_start_loc
        4. 提交块表到 GPU
        5. 构建采样参数张量
        """
        offset = 0  # token 偏移量
        for i in range(self.num_reqs):
            req = self.requests[i]
            if req is None:
                continue
            
            # 计算新 token 的范围
            # 新 token = 从 num_computed_tokens 开始的 num_new_tokens 个 token
            start = req.num_computed_tokens
            num_new = req.num_new_tokens
            
            # 填充 input_ids
            if req.prompt_embeds is not None:
                # 使用嵌入而非 token IDs
                self.input_ids[offset:offset + num_new] = 0  # placeholder
            else:
                # 使用 token IDs
                if start < len(req.prompt_token_ids):
                    # 预填充阶段: 取 prompt token
                    ids = req.prompt_token_ids[start:start + num_new]
                else:
                    # 解码阶段: 取上一步生成的 token
                    gen_start = start - len(req.prompt_token_ids)
                    ids = req.output_token_ids[gen_start:gen_start + num_new]
                self.input_ids[offset:offset + num_new] = torch.tensor(ids, dtype=torch.int32)
            
            # 填充 positions
            self.positions[offset:offset + num_new] = torch.arange(
                start, start + num_new, dtype=torch.int64
            )
            
            # 更新 seq_lens
            self.seq_lens[i] = start + num_new
            
            # 更新 query_start_loc
            self.query_start_loc[i + 1] = self.query_start_loc[i] + num_new
            
            offset += num_new
        
        # 提交块表到 GPU
        self.block_table.commit(self.num_reqs)

3.7 InputBatch 操作流程图

SchedulerOutput 到达

update()

处理新请求

add_request()
创建 CachedRequestState
更新 block_table
注册 ID 映射

处理恢复请求

更新 block_ids
更新 num_computed_tokens

处理 KV Cache 操作

swap_in: CPU->GPU

swap_out: GPU->CPU

copy_on_write: 复制共享块

移除完成请求

remove_request()
清除 block_table
压缩批次

_update_inputs()

填充 input_ids
[num_tokens]

填充 positions
[num_tokens]

更新 seq_lens
[num_reqs]

更新 query_start_loc
[num_reqs+1]

commit block_table
到 GPU

3.8 input_ids / positions / seq_lens 布局示例

GPU 张量

示例: 3个请求

1 token

4 tokens

1 token

Req 0
prompt=[1,2,3]
已生成=[4,5]
新token=[6]

Req 1
prompt=[10,20,30,40]
新token=[50,60,70,80]

Req 2
prompt=[100]
已生成=[200]
新token=[300]

input_ids
[6, 50,60,70,80, 300]
offset: 0, 1, 5

positions
[5, 0,1,2,3, 1]
各token在序列中的位置

seq_lens
[6, 4, 2]
各请求当前总长度

query_start_loc
[0, 1, 5, 6]
标记各请求token范围


四、GPU InputBatch 深度分析

4.1 概述

文件: gpu/input_batch.py (588行)

GPU InputBatch 是 GPU 优化的批次实现,使用 Triton kernel 和 CUDA 优化的张量操作来构建输入数据。

4.2 InputBuffers 类

class InputBuffers:
    """GPU 输入缓冲区 - 预分配所有输入张量。
    
    与顶层 InputBatch 的区别:
    - 更底层,直接管理 GPU 张量
    - 使用 Triton kernel 进行数据填充(GPU 端完成,无需 CPU-GPU 传输)
    - 支持 CUDA Graph(固定大小张量)
    
    缓冲区列表:
    - input_ids: [max_num_tokens] 输入 token IDs
    - positions: [max_num_tokens] 位置编码
    - query_start_loc: [max_num_reqs+1] 查询起始位置
    - seq_lens: [max_num_reqs] 序列长度
    - dcp_local_seq_lens: [max_num_reqs] DCP 本地序列长度
    """
    
    def __init__(
        self,
        max_num_reqs: int,
        max_num_tokens: int,
        device: torch.device,
    ):
        self.max_num_reqs = max_num_reqs
        self.max_num_tokens = max_num_tokens
        self.device = device
        
        # 预分配所有缓冲区
        self.input_ids = torch.zeros(max_num_tokens, dtype=torch.int32, device=device)
        self.positions = torch.zeros(max_num_tokens, dtype=torch.int64, device=device)
        self.query_start_loc = torch.zeros(
            max_num_reqs + 1, dtype=torch.int32, device=device
        )
        self.seq_lens = torch.zeros(max_num_reqs, dtype=torch.int32, device=device)
        # DCP: per-request local seq_lens buffer
        self.dcp_local_seq_lens = torch.zeros(
            max_num_reqs, dtype=torch.int32, device=device
        )

4.3 GPU InputBatch 类

class InputBatch:
    """GPU 优化的输入批次管理。
    
    核心优化:
    1. 使用 Triton kernel 在 GPU 端构建输入(避免 CPU-GPU 数据传输)
    2. 预分配固定大小缓冲区(支持 CUDA Graph)
    3. 使用 UvaBackedTensor 实现 CPU-GPU 统一虚拟地址访问
    """
    
    def __init__(
        self,
        max_num_reqs: int,
        max_num_tokens: int,
        ...
    ):
        self.buffers = InputBuffers(max_num_reqs, max_num_tokens, device)
        # ... 其他初始化

4.4 GPU vs 顶层 InputBatch 对比

GPU InputBatch (gpu/input_batch.py)

顶层 InputBatch (gpu_input_batch.py)

数据量大时
传输成为瓶颈

全部在GPU完成

CPU 构建 + GPU 传输

numpy 操作

CPU->GPU copy_()

BlockTable (numpy+pinned)

GPU 端构建

Triton kernels

无CPU-GPU传输

GPUBlockTable (GPU tensor)

CPU-GPU 带宽瓶颈

零传输开销


五、TPU InputBatch 深度分析

5.1 概述

文件: tpu_input_batch.py (574行)

TPU InputBatch 是为 Google TPU 后端定制的批次管理实现。

5.2 TPU 与 GPU 的关键差异

class TPUInputBatch:
    """TPU 输入批次 - 为 TPU 后端定制。
    
    TPU 特殊性:
    - 使用 JAX/PyTorch XLA 而非 CUDA
    - 编译时确定形状(XLA 编译需要静态形状)
    - 无 CUDA Graph(使用 XLA 编译代替)
    - 使用 TPU 特定的内存布局
    """

5.3 TPU vs GPU InputBatch 对比

TPU InputBatch

GPU InputBatch

替换为

替换为

替换为

Triton Kernels

CUDA Graph

NCCL Communication

CUDA Memory Layout

XLA Compilation

PJRT Runtime

TPU Mesh Communication

HBM Memory Layout


六、微批次调度系统

6.1 概述

微批次(Micro-batching / UBatch)系统将大批次拆分为多个小批次,目的是:

  1. 提高 GPU 利用率 - 避免大批次导致的显存溢出
  2. 改善延迟 - 小批次更快完成,减少队头阻塞
  3. 支持 prefill-decode 分离 - 将 prefill 和 decode 拆开执行

6.2 ubatch_utils.py 分析

文件: ubatch_utils.py (265行)

"""
微批次工具 - 提供微批次的创建、拆分和合并功能。

核心数据结构:
- UBatchSlices: 描述一个微批次在原始批次中的切片范围
"""

@dataclass
class UBatchSlices:
    """微批次切片 - 描述微批次在原始批次中的范围。
    
    属性:
        req_start: 请求起始索引
        req_end: 请求结束索引(不含)
        token_start: token 起始索引
        token_end: token 结束索引(不含)
    """
    req_start: int
    req_end: int
    token_start: int
    token_end: int


def check_ubatch_thresholds(
    num_reqs: int,
    num_tokens: int,
    max_ubatch_size: int,
    max_ubatch_tokens: int,
) -> bool:
    """检查是否需要拆分微批次。
    
    判断条件:
    - num_reqs > max_ubatch_size → 需要拆分
    - num_tokens > max_ubatch_tokens → 需要拆分
    
    Returns:
        True 表示需要拆分
    """
    return num_reqs > max_ubatch_size or num_tokens > max_ubatch_tokens


def maybe_create_ubatch_slices(
    num_reqs: int,
    num_tokens: int,
    query_start_loc: torch.Tensor,
    max_ubatch_size: int,
    max_ubatch_tokens: int,
) -> list[UBatchSlices] | None:
    """尝试创建微批次切片。
    
    如果不需要拆分,返回 None。
    如果需要拆分,返回切片列表。
    
    拆分策略:
    1. 尽量保持请求的完整性(不拆分单个请求)
    2. 优先满足 max_ubatch_tokens 约束
    3. 在满足 token 约束的前提下尽量填满 max_ubatch_size
    """
    if not check_ubatch_thresholds(num_reqs, num_tokens, ...):
        return None
    
    slices = []
    req_start = 0
    while req_start < num_reqs:
        # 找到满足约束的最大请求范围
        req_end = _find_ubatch_boundary(
            req_start, num_reqs, query_start_loc, ...
        )
        slices.append(UBatchSlices(
            req_start=req_start,
            req_end=req_end,
            token_start=query_start_loc[req_start].item(),
            token_end=query_start_loc[req_end].item(),
        ))
        req_start = req_end
    
    return slices


def split_attn_metadata(
    attn_metadata: AttentionMetadata,
    ubatch_slices: list[UBatchSlices],
) -> list[AttentionMetadata]:
    """将注意力元数据按微批次切片拆分。
    
    每个微批次需要独立的注意力元数据,
    因为注意力计算是按批次独立执行的。
    """

6.3 ubatching.py 分析

文件: ubatching.py (241行)

"""
微批次调度逻辑 - 决定如何将批次拆分为微批次。

调度策略:
1. Prefill-Decode 分离: 将 prefill 和 decode 请求分别组批
2. 按大小排序: 将相似大小的请求组在一起
3. 自适应拆分: 根据当前 GPU 负载动态调整微批次大小
"""

6.4 微批次调度流程

SchedulerOutput

构建 InputBatch

需要拆分?

单次执行
全部token

maybe_create_ubatch_slices()

计算切片边界

UBatchSlices 0

UBatchSlices 1

UBatchSlices N

执行 UBatch 0

执行 UBatch 1

执行 UBatch N

合并结果

返回 ModelRunnerOutput

6.5 微批次拆分示意

各微批次独立执行

微批次拆分 (max_ubatch_size=4)

原始批次 (8 reqs, 512 tokens)

split

split

结果0

结果1

InputBatch
req 0-7
tokens 0-511

UBatch 0
req 0-3
tokens 0-255

UBatch 1
req 4-7
tokens 256-511

forward(ubatch_0)

forward(ubatch_1)

合并


七、UBatchWrapper 微批次包装器

7.1 概述

文件: gpu_ubatch_wrapper.py (510行)

UBatchWrapper 是微批次的执行包装器,负责:

  1. 管理微批次的输入/输出张量切片
  2. 调用模型前向传播和采样
  3. 将微批次结果合并到完整批次

7.2 核心方法

class UBatchWrapper:
    """微批次包装器 - 封装微批次的执行逻辑。"""
    
    def __init__(
        self,
        model_runner: GPUModelRunner,
        ...
    ):
        self.model_runner = model_runner
    
    def execute_ubatch(
        self,
        ubatch_slices: UBatchSlices,
        input_batch: InputBatch,
        ...
    ) -> ModelRunnerOutput:
        """执行一个微批次的推理。
        
        步骤:
        1. 切分输入张量
        2. 构建微批次的注意力元数据
        3. 执行前向传播
        4. 执行采样
        5. 返回结果
        """
        # 1. 切分输入
        input_ids = input_batch.input_ids[
            ubatch_slices.token_start:ubatch_slices.token_end
        ]
        positions = input_batch.positions[
            ubatch_slices.token_start:ubatch_slices.token_end
        ]
        # ... 其他张量切分
        
        # 2. 构建微批次注意力元数据
        ubatch_attn_metadata = split_attn_metadata(...)
        
        # 3. 前向传播
        hidden_states = self.model_runner.model.forward(
            input_ids=input_ids,
            positions=positions,
            kv_cache=kv_cache,
            attn_metadata=ubatch_attn_metadata,
        )
        
        # 4. 采样
        output = self.model_runner.sampler(...)
        
        return output

7.3 UBatchWrapper 执行流程

Sampler nn.Module UBatchWrapper GPUModelRunner Sampler nn.Module UBatchWrapper GPUModelRunner execute_ubatch(slices, input_batch) 切分 input_ids, positions, seq_lens 构建微批次 attn_metadata forward(ubatch_inputs) hidden_states / logits sample(logits, sampling_metadata) sampled_token_ids ModelRunnerOutput (微批次)

八、采样管线总览

8.1 采样管线架构

采样管线是模型前向传播之后的处理阶段,将 logits 转换为最终的 token ID。

输出

采样管线

模型输出

logits
[num_reqs, vocab_size]

Penalties
重复/频率/存在惩罚

LogitBias
偏置应用

Temperature
温度缩放

Top-K
取概率最高的K个

Top-P
累积概率截断

Min-P
最小概率阈值

Gumbel
Gumbel采样

BadWords
坏词过滤

Grammar
语法约束

sampled_token_ids
[num_reqs]

logprobs
[num_reqs, top_k]

8.2 采样管线类图

Sampler

-max_num_reqs: int

-vocab_size: int

-logprobs_mode: LogprobsMode

-states: SamplingStates

-penalties: PenaltiesState

-logit_bias: LogitBiasState

-bad_words: BadWordsState

-logprob_token_ids: LogprobTokenIdsState

+init(max_num_reqs, vocab_size, logprobs_mode)

+init_gpu_tensors(device)

+update_state(input_batch)

+forward(logits, sampling_metadata) : SamplerOutput

SamplingStates

+temperature: UvaBackedTensor

+top_k: UvaBackedTensor

+top_p: UvaBackedTensor

+min_p: UvaBackedTensor

+seed: UvaBackedTensor

+is_greedy: np.ndarray

+is_random: np.ndarray

PenaltiesState

+repetition_penalty: UvaBackedTensor

+frequency_penalty: UvaBackedTensor

+presence_penalty: UvaBackedTensor

+use_penalty: np.ndarray

+update_penalties(input_batch)

LogitBiasState

+logit_bias: Tensor

+has_logit_bias: np.ndarray

+apply(logits)

BadWordsState

+bad_words_token_ids: Tensor

+apply(logits)

LogprobTokenIdsState

+compute_topk_logprobs(logits)

8.3 采样管线执行流程

Greedy

Random (top-k/top-p)

Gumbel

logits [num_reqs, vocab_size]

apply_penalties()

apply_logit_bias()

apply_temperature()

采样类型判断

argmax(logits)

apply_top_k_top_p()

gumbel_sample()

采样得到 token_ids

apply_bad_words()

apply_grammar_bitmask()

compute_logprobs()

SamplerOutput
sampled_token_ids + logprobs


九、Sampler 采样器核心

9.1 概述

文件: gpu/sample/sampler.py (197行)

Sampler 是采样管线的入口和协调者,将各采样组件串联执行。

9.2 源码分析

class Sampler:
    """采样器 - 协调所有采样组件的执行。
    
    采样流程:
    1. 应用惩罚(重复/频率/存在)
    2. 应用 logit 偏置
    3. 应用温度缩放
    4. 应用 top-k / top-p 过滤
    5. 执行采样(贪心/随机/Gumbel)
    6. 应用坏词过滤
    7. 应用语法约束
    8. 计算对数概率
    """
    
    def __init__(
        self,
        max_num_reqs: int,     # 最大请求数
        vocab_size: int,        # 词汇表大小
        logprobs_mode: LogprobsMode,  # 对数概率模式
    ):
        self.max_num_reqs = max_num_reqs
        self.vocab_size = vocab_size
        self.logprobs_mode = logprobs_mode
        
        # 初始化各采样组件
        self.states = SamplingStates(max_num_reqs, vocab_size)
        self.penalties = PenaltiesState(...)
        self.logit_bias = LogitBiasState(...)
        self.bad_words = BadWordsState(...)
        self.logprob_token_ids = LogprobTokenIdsState(...)
    
    def init_gpu_tensors(self, device: torch.device) -> None:
        """初始化 GPU 张量。"""
        self.states.init_gpu_tensors(device)
        self.penalties.init_gpu_tensors(device)
        self.logit_bias.init_gpu_tensors(device)
    
    def update_state(self, input_batch) -> None:
        """更新采样状态。
        
        从 InputBatch 中提取各请求的采样参数,
        更新到 GPU 张量中。
        """
        self.states.update(input_batch)
        self.penalties.update(input_batch)
        self.logit_bias.update(input_batch)
        self.bad_words.update(input_batch)
    
    def forward(
        self,
        logits: torch.Tensor,           # [num_reqs, vocab_size]
        sampling_metadata: SamplingMetadata,  # 采样元数据
    ) -> SamplerOutput:
        """执行采样。
        
        Args:
            logits: 模型输出的原始 logits
            sampling_metadata: 采样参数元数据
            
        Returns:
            SamplerOutput: 包含 sampled_token_ids 和 logprobs
        """
        # 步骤1: 应用惩罚
        logits = self.penalties.apply(logits, ...)
        
        # 步骤2: 应用 logit 偏置
        logits = self.logit_bias.apply(logits, ...)
        
        # 步骤3: 分类采样 - 根据采样类型分组
        # 贪心采样组
        greedy_mask = self.states.is_greedy[:num_reqs]
        # 随机采样组
        random_mask = self.states.is_random[:num_reqs]
        
        # 步骤4: 贪心采样
        if greedy_mask.any():
            greedy_token_ids = torch.argmax(logits[greedy_mask], dim=-1)
        
        # 步骤5: 随机采样
        if random_mask.any():
            # 应用温度
            temp = self.states.temperature[:num_reqs][random_mask]
            logits[random_mask] = logits[random_mask] / temp.unsqueeze(-1)
            
            # 应用 top-k / top-p
            logits[random_mask] = apply_top_k_top_p(
                logits[random_mask],
                self.states.top_k[:num_reqs][random_mask],
                self.states.top_p[:num_reqs][random_mask],
            )
            
            # 应用 min-p
            logits[random_mask] = apply_min_p(
                logits[random_mask],
                self.states.min_p[:num_reqs][random_mask],
            )
            
            # 采样
            random_token_ids = gumbel_sample(
                logits[random_mask],
                self.states.seed[:num_reqs][random_mask],
            )
        
        # 步骤6: 合并结果
        sampled_token_ids = torch.empty(num_reqs, dtype=torch.int32)
        sampled_token_ids[greedy_mask] = greedy_token_ids
        sampled_token_ids[random_mask] = random_token_ids
        
        # 步骤7: 坏词过滤
        sampled_token_ids = self.bad_words.apply(sampled_token_ids, ...)
        
        # 步骤8: 计算对数概率
        logprobs = self.logprob_token_ids.compute_topk_logprobs(logits, ...)
        
        return SamplerOutput(
            sampled_token_ids=sampled_token_ids,
            logprob_token_ids=logprobs.token_ids,
            logprob_values=logprobs.values,
        )

9.3 Sampler 采样流程详解

LogprobCompute BadWordsState GumbelSample TopK/TopP Temperature LogitBiasState PenaltiesState Sampler GPUModelRunner LogprobCompute BadWordsState GumbelSample TopK/TopP Temperature LogitBiasState PenaltiesState Sampler GPUModelRunner alt [Greedy 采样] [Random 采样] forward(logits, sampling_metadata) apply(logits, req_states) penalized_logits apply(penalized_logits) biased_logits 分类: greedy / random argmax(biased_logits[greedy]) apply_temperature(biased_logits[random]) scaled_logits apply_top_k_top_p(scaled_logits) filtered_logits gumbel_sample(filtered_logits) random_token_ids apply(token_ids) compute_topk_logprobs(logits) SamplerOutput

十、SamplingStates 采样状态

10.1 概述

文件: gpu/sample/states.py (104行)

SamplingStates 管理所有请求的采样参数的 GPU 张量表示。

10.2 源码分析

NO_LOGPROBS = -1  # 不计算 logprobs 的标记

class SamplingStates:
    """采样状态 - 存储所有请求的采样参数的 GPU 张量。
    
    每个采样参数都是一个 GPU 张量 [max_num_reqs],
    索引对应请求在批次中的位置。
    
    使用 UvaBackedTensor 实现:
    - CPU 和 GPU 共享同一物理内存(统一虚拟地址)
    - 避免显式的 CPU->GPU 拷贝
    - CPU 端写入立即可在 GPU 端可见
    """
    
    def __init__(self, max_num_reqs: int, vocab_size: int):
        self.max_num_reqs = max_num_reqs
        self.vocab_size = vocab_size
        
        # === 采样参数张量 ===
        # 温度 - 控制采样的随机性
        # temperature=0: 贪心, temperature=1: 标准随机, temperature>1: 更随机
        self.temperature = UvaBackedTensor(max_num_reqs, dtype=torch.float32)
        
        # Top-K - 只保留概率最高的 K 个 token
        # top_k=1: 贪心, top_k=vocab_size: 不过滤
        self.top_k = UvaBackedTensor(max_num_reqs, dtype=torch.int32)
        
        # Top-P (nucleus sampling) - 累积概率阈值
        # top_p=0.9: 保留累积概率前 90% 的 token
        self.top_p = UvaBackedTensor(max_num_reqs, dtype=torch.float32)
        
        # Min-P - 最小概率阈值
        # 过滤掉概率低于最高概率 token 的 min_p 倍的 token
        self.min_p = UvaBackedTensor(max_num_reqs, dtype=torch.float32)
        
        # 随机种子 - 用于确定性随机采样
        self.seed = UvaBackedTensor(max_num_reqs, dtype=torch.int64)
        
        # === 分类掩码 (numpy) ===
        # 是否为贪心采样
        self.is_greedy = np.zeros(max_num_reqs, dtype=bool)
        # 是否为随机采样
        self.is_random = np.zeros(max_num_reqs, dtype=bool)

10.3 UvaBackedTensor 原理

UvaBackedTensor

统一虚拟地址
CPU/GPU 共享内存

零拷贝
CPU写入GPU立即可见

传统方式

copy_()

CPU Tensor

GPU Tensor

显式拷贝
带宽瓶颈


十一、Penalties 惩罚系统

11.1 概述

文件: gpu/sample/penalties.py (310行)

Penalties 系统实现了三种采样惩罚机制,用于控制模型生成内容的重复性。

11.2 三种惩罚类型

惩罚类型 公式 作用
Repetition Penalty logit_i *= penalty (if logit_i > 0)
logit_i /= penalty (if logit_i < 0)
降低已出现 token 的概率
Frequency Penalty logit_i -= frequency * count(token_i) 按出现次数线性降低概率
Presence Penalty logit_i -= presence (if count(token_i) > 0) 只要出现过就降低概率

11.3 源码分析

class PenaltiesState:
    """惩罚状态 - 管理各请求的惩罚参数和计算。
    
    惩罚机制的核心思想:
    - 已生成的 token 不应该以过高概率再次生成
    - 不同的惩罚策略适用于不同的场景
    - Repetition: 适用于需要避免任何重复的场景
    - Frequency: 适用于需要渐进降低重复概率的场景
    - Presence: 适用于需要鼓励多样性的场景
    """
    
    def __init__(self, req_states: RequestState):
        self.req_states = req_states
        
        max_num_reqs = req_states.max_num_reqs
        self.vocab_size = req_states.vocab_size
        self.device = req_states.device
        
        # 惩罚参数张量
        self.repetition_penalty = UvaBackedTensor(max_num_reqs, dtype=torch.float32)
        self.frequency_penalty = UvaBackedTensor(max_num_reqs, dtype=torch.float32)
        self.presence_penalty = UvaBackedTensor(max_num_reqs, dtype=torch.float32)
        
        # 是否使用惩罚的标记
        self.use_penalty = np.zeros(max_num_reqs, dtype=bool)
    
    def apply(
        self,
        logits: torch.Tensor,  # [num_reqs, vocab_size]
        ...
    ) -> torch.Tensor:
        """应用惩罚到 logits。
        
        执行步骤:
        1. 构建 token 出现次数矩阵
        2. 应用 repetition penalty
        3. 应用 frequency penalty
        4. 应用 presence penalty
        """
        if not self.use_penalty.any():
            return logits  # 无需惩罚,直接返回
        
        # 构建 token 出现频率矩阵
        # frequency_matrix[req_idx, token_id] = token_id 出现的次数
        frequency_matrix = self._build_frequency_matrix(...)
        
        # 应用各惩罚
        for i in range(num_reqs):
            if not self.use_penalty[i]:
                continue
            
            # Repetition Penalty
            rep_penalty = self.repetition_penalty[i]
            if rep_penalty != 1.0:
                # 正 logit 乘以惩罚,负 logit 除以惩罚
                positive_mask = logits[i] > 0
                logits[i][positive_mask] *= rep_penalty
                logits[i][~positive_mask] /= rep_penalty
            
            # Frequency Penalty
            freq_penalty = self.frequency_penalty[i]
            if freq_penalty != 0.0:
                logits[i] -= freq_penalty * frequency_matrix[i]
            
            # Presence Penalty
            pres_penalty = self.presence_penalty[i]
            if pres_penalty != 0.0:
                logits[i] -= pres_penalty * (frequency_matrix[i] > 0).float()
        
        return logits

11.4 惩罚效果可视化

Presence Penalty=0.5

token A: 2.0-0.5=1.5
降低

token B: 1.5-0=1.5
不变

token C: -0.5-0.5=-1.0
降低

Frequency Penalty=0.5

token A: 2.0-0.5*3=0.5
大幅降低

token B: 1.5-0.5*0=1.5
不变

token C: -0.5-0.5*1=-1.0
降低

Repetition Penalty=1.5

token A: 2.0*1.5=3.0
概率反而增大!

token B: 1.5*1.5=2.25
概率增大

token C: -0.5/1.5=-0.33
绝对值减小

正logit被放大
负logit被缩小
已出现token概率相对降低

原始 logits

token A: logit=2.0
已出现3次

token B: logit=1.5
已出现0次

token C: logit=-0.5
已出现1次


十二、Logprob 对数概率计算

12.1 概述

文件: gpu/sample/logprob.py (250行)

Logprob 模块计算采样 token 的对数概率,用于:

  • 返回 top-k 个 token 的概率分布
  • 调试和监控模型输出
  • 支持推测解码的验证

12.2 Triton Kernel 实现

@triton.jit
def _topk_log_softmax_kernel(
    output_ptr,       # 输出: top-k log softmax 值
    logits_ptr,       # 输入: logits
    logits_stride,    # logits 的行步长
    topk_ids_ptr,     # 输出: top-k token IDs
    topk,             # top-k 值
    vocab_size,       # 词汇表大小
    BLOCK_SIZE: tl.constexpr,    # Triton 块大小(编译时常量)
    PADDED_TOPK: tl.constexpr,   # 填充后的 top-k 大小
):
    """Triton kernel: 计算 log-softmax 并提取 top-k 结果。
    
    算法:
    1. 计算 logits 的最大值(数值稳定)
    2. 计算 log-softmax = log(exp(logit - max) / sum(exp(logit - max)))
    3. 选择 top-k 个最大的 log-softmax 值
    4. 写出结果
    
    优化:
    - 使用 Triton 自动向量化
    - BLOCK_SIZE 编译时常量允许循环展开
    - 单个 kernel 完成所有计算,避免多次 kernel launch
    """
    req_idx = tl.program_id(0)  # 请求索引
    row_ptr = logits_ptr + req_idx * logits_stride
    
    # 步骤1: 计算 logits 的最大值(用于数值稳定)
    max_val = float("-inf")
    for i in range(0, vocab_size, BLOCK_SIZE):
        block = i + tl.arange(0, BLOCK_SIZE)
        logits = tl.load(row_ptr + block, mask=block < vocab_size, other=float("-inf"))
        max_val = tl.max(tl.maximum(logits, max_val))
    max_val = max_val.to(tl.float32)
    
    # 步骤2: 计算 log-softmax
    # log_softmax = logit - max - log(sum(exp(logit - max)))
    sum_exp = 0.0
    for i in range(0, vocab_size, BLOCK_SIZE):
        block = i + tl.arange(0, BLOCK_SIZE)
        logits = tl.load(row_ptr + block, mask=block < vocab_size, other=float("-inf"))
        sum_exp += tl.sum(tl.exp(logits - max_val))
    
    log_sum_exp = tl.log(sum_exp)
    
    # 步骤3: 输出所有 log-softmax 值
    for i in range(0, vocab_size, BLOCK_SIZE):
        block = i + tl.arange(0, BLOCK_SIZE)
        logits = tl.load(row_ptr + block, mask=block < vocab_size, other=float("-inf"))
        log_softmax = logits - max_val - log_sum_exp
        tl.store(output_ptr + req_idx * vocab_size + block, log_softmax,
                 mask=block < vocab_size)

12.3 Logprob 计算流程

logits [num_reqs, vocab_size]

_topk_log_softmax_kernel

Step 1: 找最大值
max_val = max(logits)

Step 2: 求和
sum_exp = sum(exp(logits - max_val))

Step 3: log-softmax
= logit - max_val - log(sum_exp)

Step 4: 提取 top-k

topk_logprobs
[num_reqs, top_k]

topk_token_ids
[num_reqs, top_k]

12.4 LogprobTokenIdsState

class LogprobTokenIdsState:
    """Logprob token IDs 状态 - 管理 logprob 计算所需的缓冲区。"""
    
    def __init__(self, max_num_reqs, vocab_size, logprobs_mode):
        self.max_num_reqs = max_num_reqs
        self.vocab_size = vocab_size
        # 请求的 top-k 数量
        self.num_topk = np.zeros(max_num_reqs, dtype=np.int32)
    
    def compute_topk_logprobs(
        self,
        logits: torch.Tensor,
        ...
    ) -> LogprobsTensors:
        """计算 top-k logprobs。
        
        Returns:
            LogprobsTensors: 包含 token_ids 和 logprob 值
        """

十三、PromptLogprob 提示对数概率

13.1 概述

文件: gpu/sample/prompt_logprob.py (236行)

PromptLogprob 计算提示(prompt)中每个 token 的对数概率,用于评估模型对提示的理解程度。

13.2 特殊性

  • 提示 logprob 是在预填充阶段计算的
  • 每个提示 token 的 logprob 是条件概率 P(token_i | token_0, …, token_{i-1})
  • 用于评估提示的"可预测性"

13.3 计算方式

def compute_prompt_logprobs(
    logits: torch.Tensor,          # [num_prompt_tokens, vocab_size]
    prompt_token_ids: torch.Tensor, # [num_prompt_tokens]
    ...
) -> LogprobsTensors:
    """计算提示中每个 token 的对数概率。
    
    对于 prompt [t0, t1, t2, t3]:
    - logprob(t0) = 不计算(无上下文)
    - logprob(t1) = log P(t1 | t0)
    - logprob(t2) = log P(t2 | t0, t1)
    - logprob(t3) = log P(t3 | t0, t1, t2)
    
    使用 gather 操作高效提取:
    logprobs[i] = log_softmax(logits[i])[prompt_token_ids[i+1]]
    """

十四、LogitBias 偏置应用

14.1 概述

文件: gpu/sample/logit_bias.py (280行)

LogitBias 允许用户指定特定 token 的偏置值,在采样前添加到 logits 上。

14.2 源码分析

class LogitBiasState:
    """Logit 偏置状态 - 管理各请求的 logit 偏置。
    
    用例:
    - 提高/降低特定 token 的生成概率
    - 实现自定义的 token 过滤
    - 支持结构化生成的偏置引导
    
    数据格式:
    - logit_bias: [num_reqs, vocab_size] 偏置张量
    - has_logit_bias: [num_reqs] bool 标记
    """
    
    def __init__(self, ...):
        self.has_logit_bias = np.zeros(max_num_reqs, dtype=bool)
        # 稀疏存储: 只存储有偏置的 token
        self.logit_bias = torch.zeros(...)  # GPU 张量
    
    def apply(
        self,
        logits: torch.Tensor,
    ) -> torch.Tensor:
        """应用 logit 偏置。"""
        if not self.has_logit_bias.any():
            return logits
        
        for i in range(num_reqs):
            if self.has_logit_bias[i]:
                logits[i] += self.logit_bias[i]
        
        return logits

十五、MinP / Gumbel 采样

15.1 Min-P 采样

文件: gpu/sample/min_p.py (60行)

"""
Min-P 采样 - 最小概率阈值过滤。

原理:
1. 找到概率最高的 token,设其概率为 P_max
2. 设定阈值 min_p
3. 过滤掉所有概率 < P_max * min_p 的 token
4. 在剩余 token 中按概率采样

优势(相比 top-p):
- 自适应阈值: 阈值随最高概率动态调整
- 在高置信度时更贪心,低置信度时更多样
"""

15.2 Gumbel 采样

文件: gpu/sample/gumbel.py (211行)

"""
Gumbel 采样 - 使用 Gumbel-max 技巧进行高效采样。

Gumbel-Max 技巧:
1. 对每个 logit 添加 Gumbel 噪声: gumbel_logits = logits + Gumbel(0, 1)
2. 取 argmax(gumbel_logits)

数学等价性:
- argmax(logits + Gumbel) 等价于按 softmax(logits) 概率采样
- 但避免了计算 softmax 的开销
- 只需要一次 argmax 操作

优势:
- 计算效率高: 不需要 softmax + CDF 采样
- 数值稳定: Gumbel 噪声是加性的
- 可重现: 使用确定性种子
"""

def gumbel_sample(
    logits: torch.Tensor,    # [num_reqs, vocab_size]
    seeds: torch.Tensor,     # [num_reqs] 随机种子
) -> torch.Tensor:
    """使用 Gumbel-max 技巧采样。
    
    步骤:
    1. 使用种子生成 Gumbel(0,1) 噪声
    2. 添加噪声到 logits
    3. 取 argmax
    
    Returns:
        sampled_token_ids: [num_reqs]
    """
    # 生成 Gumbel 噪声
    # Gumbel(0,1) = -log(-log(Uniform(0,1)))
    uniform = torch.rand_like(logits)
    gumbel_noise = -torch.log(-torch.log(uniform + 1e-6) + 1e-6)
    
    # 添加噪声并 argmax
    gumbel_logits = logits + gumbel_noise
    return torch.argmax(gumbel_logits, dim=-1)

15.3 温度缩放

def apply_temperature(
    logits: torch.Tensor,    # [num_reqs, vocab_size]
    temperature: torch.Tensor, # [num_reqs]
) -> torch.Tensor:
    """应用温度缩放到 logits。
    
    公式: scaled_logits = logits / temperature
    
    效果:
    - temperature < 1: 更确定(峰值更尖锐)
    - temperature = 1: 不变
    - temperature > 1: 更随机(分布更平坦)
    """
    return logits / temperature.unsqueeze(-1)

15.4 采样方法对比

采样方法对比

温度=0

K增大

动态阈值

等价实现

Greedy
argmax(logits)
确定性输出
最高质量/最低多样性

Top-K
取前K个候选
随机采样
K=1=Greedy

Top-P (Nucleus)
累积概率截断
自适应候选数
最佳平衡

Min-P
相对概率阈值
自适应阈值
优于Top-P

Gumbel
Gumbel-max技巧
高效采样
数学等价于softmax采样


十六、BadWords 坏词过滤

16.1 概述

文件: gpu/sample/bad_words.py (194行)

BadWords 过滤模块防止模型生成用户指定的"坏词"。

16.2 实现原理

class BadWordsState:
    """坏词状态 - 管理坏词列表和过滤逻辑。
    
    过滤方式:
    - 将坏词的 token ID 在 logits 中设为 -inf
    - 确保采样时不会选中这些 token
    
    注意: 坏词可能是多 token 序列
    - 例如 "bad word" = [token_bad, token_word]
    - 只在已生成前缀匹配时过滤后续 token
    """
    
    def __init__(self, ...):
        # 坏词 token IDs 列表
        self.bad_words_token_ids = ...
    
    def apply(
        self,
        logits: torch.Tensor,
        ...
    ) -> torch.Tensor:
        """应用坏词过滤。"""
        # 将坏词 token 的 logit 设为 -inf
        for token_id in self.bad_words_token_ids:
            logits[:, token_id] = float("-inf")
        return logits

十七、StructuredOutputs 结构化输出

17.1 概述

文件: gpu/structured_outputs.py (115行)

StructuredOutputs 模块支持语法约束的采样,确保输出符合指定格式(如 JSON Schema、正则表达式)。

17.2 实现原理

"""
结构化输出 - 语法约束采样。

支持的语法引擎:
1. xGrammar: 基于上下文无关文法(CFG)
2. LALG: 基于正则表达式
3. JSON Schema: 基于JSON Schema规范

实现方式:
- 语法引擎生成一个 bitmask [num_reqs, vocab_size]
- bitmask[i][j] = 0 表示 token j 在位置 i 是非法的
- 在采样时应用 bitmask: 非法 token 的 logit 设为 -inf

性能优化:
- bitmask 在每个 token 位置动态更新
- 使用 GPU 并行计算 bitmask
- 支持 incremental 更新(只更新变化的部分)
"""

def apply_grammar_bitmask(
    logits: torch.Tensor,          # [num_reqs, vocab_size]
    grammar_bitmask: torch.Tensor, # [num_reqs, ceil(vocab_size/32)] 位掩码
    indices: list[int],            # 需要应用约束的请求索引
) -> None:
    """将语法约束 bitmask 应用到 logits。
    
    算法:
    1. 将 bitmask 解码为 token 级别的合法/非法标记
    2. 对非法 token 的 logit 设为 -inf
    
    注意: 这是 in-place 操作,直接修改 logits
    """
    for i in indices:
        # 解码 bitmask
        # bitmask[i] 是一个 uint32 数组,每个 bit 代表一个 token
        # bit=1 表示合法,bit=0 表示非法
        mask = _decode_bitmask(grammar_bitmask[i], vocab_size)
        logits[i][~mask] = float("-inf")

17.3 结构化输出流程

示例: JSON对象开头

合法: {
bitmask: 1

非法: 其他所有token
bitmask: 0

用户请求 JSON 输出

编译 JSON Schema 为 Grammar

Grammar 生成 Bitmask

Bitmask 标记合法/非法 Token

logits + Bitmask

非法 token logit = -inf

采样只选择合法 token

生成 token 更新 Grammar 状态

更新 Bitmask

17.4 Grammar Bitmask 编码

解码后

Bitmask 编码 (32 token/uint32)

uint32[0]
token 0-31
bit0=1: token_0 合法
bit1=0: token_1 非法
...

uint32[1]
token 32-63
...

uint32[N]
token N*32-end
...

mask[0] = True
token_0 合法

mask[1] = False
token_1 非法

mask[2] = True
token_2 合法


附录 A: InputBatch 与 Sampler 数据流总图

Sampler

GPUModelRunner

InputBatch

调度器

logits

SchedulerOutput

CachedRequestState
请求级缓存

update()

_update_inputs()

input_ids [num_tokens]

positions [num_tokens]

seq_lens [num_reqs]

block_table [num_reqs, max_blocks]

slot_mapping [num_tokens]

sampling_params

model.forward()

Penalties

LogitBias

Temperature

TopK/TopP

Gumbel

BadWords

Grammar

Logprobs

ModelRunnerOutput

附录 B: 采样参数速查表

参数 类型 默认值 说明
temperature float 1.0 温度,0=贪心,>0=随机
top_k int -1 Top-K,-1=不过滤
top_p float 1.0 Top-P,1.0=不过滤
min_p float 0.0 Min-P,0.0=不过滤
repetition_penalty float 1.0 重复惩罚,1.0=无惩罚
frequency_penalty float 0.0 频率惩罚,0.0=无惩罚
presence_penalty float 0.0 存在惩罚,0.0=无惩罚
seed int None 随机种子,None=非确定性
max_tokens int 16 最大生成 token 数
logprobs int 0 返回的 top-k logprob 数

附录 C: 微批次参数说明

参数 说明
max_ubatch_size 微批次最大请求数
max_ubatch_tokens 微批次最大 token 数
ubatch_slices 微批次在原始批次中的切片范围

附录 D: 关键类方法清单

InputBatch

方法 作用
add_request() 添加新请求到批次
remove_request() 移除已完成请求
update() 根据调度器输出更新批次
_update_inputs() 更新 GPU 输入张量
get_block_table() 获取块表
get_seq_lens() 获取序列长度

Sampler

方法 作用
init_gpu_tensors() 初始化 GPU 张量
update_state() 更新采样参数
forward() 执行采样

SamplingStates

方法 作用
update() 从 InputBatch 更新参数
init_gpu_tensors() 初始化 GPU 张量

Part 2 结束 - 本文件覆盖了输入批处理系统(InputBatch、CachedRequestState)和完整采样管线(Sampler、Penalties、Logprobs、LogitBias、MinP、Gumbel、BadWords、StructuredOutputs)。

Part 3 将深入分析: GPUModelRunner(7174行核心)、推测解码(EAGLE/RejectionSampler)、多模态编码、KV 连接器、LoRA、模型状态、Mamba 等。


附录 E: InputBatch 完整更新流程深度分析

E.1 update() 方法详细执行步骤

InputBatch.update() 是每个推理步骤的核心更新方法。完整执行步骤:

  1. 处理新请求 (scheduled_new_reqs):

    • 遍历调度器输出的新请求列表
    • 为每个请求分配批次索引
    • 创建 CachedRequestState 并填充初始状态
    • 更新块表 block_table.add_row()
    • 注册请求 ID 到索引的映射
  2. 处理恢复请求 (scheduled_cached_reqs):

    • 恢复被抢占(preempted)的请求
    • 更新已计算 token 数 (num_computed_tokens)
    • 更新块表 (可能有新分配的块)
    • 更新输出 token 数 (num_output_tokens)
  3. 处理 KV Cache 操作:

    • swap_in: CPU->GPU, 将换出的 KV Cache 块换回
    • swap_out: GPU->CPU, 将不活跃的 KV Cache 块换出
    • copy_on_write: 复制共享块,确保写时隔离
  4. 移除完成请求:

    • 清除请求的缓存状态
    • 清除块表行
    • 压缩批次(将最后请求移到空位)
  5. 更新 GPU 缓冲区 (_update_inputs):

    • 填充 input_ids [num_tokens]
    • 填充 positions [num_tokens]
    • 更新 seq_lens [num_reqs]
    • 更新 query_start_loc [num_reqs+1]
    • 提交 block_table 到 GPU

E.2 KV Cache Swap 操作流程

CPU KV Cache GPU KV Cache InputBatch Scheduler CPU KV Cache GPU KV Cache InputBatch Scheduler Swap In: CPU ->> GPU Swap Out: GPU ->> CPU Copy On Write 共享块复制,写时隔离 blocks_to_swap_in = [(cpu_block=10, gpu_block=5)] 读取 cpu_kv_cache[10] copy to gpu_kv_cache[5] blocks_to_swap_out = [(gpu_block=3, cpu_block=20)] 读取 gpu_kv_cache[3] copy to cpu_kv_cache[20] blocks_to_copy = [(src=5, dst=8)] gpu_kv_cache[8] = gpu_kv_cache[5]

E.3 Copy-On-Write 详解

Copy-On-Write (COW) 是 KV Cache 共享的关键机制:

当两个请求共享相同前缀时,它们共享 KV Cache 块。当其中一个请求需要修改(追加新 token)时,先复制共享块,然后在副本上修改。

Req 2 追加新 token D

共享前缀

引用

引用

仍引用

引用新块

追加

Req 1: prompt=A B C

Req 2: prompt=A B C

Block 5: KV for A B C
共享块

Block 8: KV for A B C
复制块 (COW)

Block 9: KV for D
新块


附录 F: 微批次执行全流程

F.1 UBatchWrapper 执行流程详解

Sampler model.forward UBatchWrapper UBatchUtils GPUModelRunner Sampler model.forward UBatchWrapper UBatchUtils GPUModelRunner loop [每个微批次] alt [不需要拆分] [需要拆分] check_ubatch_thresholds(num_reqs, num_tokens) None forward(full_batch) [UBatchSlices1, UBatchSlices2, ...] execute_ubatch(slices_i) 切分 input_ids, positions, seq_lens 构建微批次 attn_metadata forward(ubatch_inputs) hidden_states sample(logits, metadata) sampled_token_ids partial_output_i 合并所有微批次结果

F.2 微批次拆分算法

maybe_create_ubatch_slices() 的拆分策略:

  1. 尽量保持请求的完整性(不拆分单个请求的 token)
  2. 优先满足 max_ubatch_tokens 约束
  3. 在满足 token 约束的前提下尽量填满 max_ubatch_size
  4. 使用贪心算法: 依次添加请求,直到超过阈值

请求列表: req0(100tok), req1(200tok), req2(50tok), req3(300tok)

max_ubatch_tokens=256

UBatch 0: req0(100) + req2(50) = 150 <= 256

尝试加req1: 150+200=350 > 256, 停止

UBatch 1: req1(200) <= 256

尝试加req3: 200+300=500 > 256, 停止

UBatch 2: req3(300) > 256, 单独成批(允许超限)


附录 G: Sampler 完整执行流程图

G.1 Sampler.forward() 详细步骤

Greedy

Random

logits [num_reqs vocab_size]

PenaltiesState.apply()

Repetition Penalty
正logit乘以penalty
负logit除以penalty

Frequency Penalty
logit -= freq * count

Presence Penalty
logit -= pres * (count>0)

LogitBiasState.apply()

logits[i] += bias[i]
仅对有偏置的请求

分类: Greedy vs Random

argmax(logits[greedy_mask])

apply_temperature()

logits /= temperature

apply_top_k_top_p()

保留top-k个最大logit
保留累积概率top-p

apply_min_p()

过滤 prob < max_prob * min_p

gumbel_sample()

logits += Gumbel(0,1)
argmax(gumbel_logits)

合并 greedy + random 结果

BadWordsState.apply()

坏词logit设为-inf
重新采样替代token

Grammar bitmask
非法token logit = -inf

compute_topk_logprobs()

Triton kernel
log_softmax + top-k

SamplerOutput

G.2 采样参数分组策略

Sampler 将请求按采样类型分组,避免混合采样导致的条件分支:

分组执行

所有请求

req0: temp=0 (Greedy)

req1: temp=0.7 (Random)

req2: temp=0 (Greedy)

req3: temp=1.2 (Random)

Greedy Group
req0, req2
argmax(logits)

Random Group
req1, req3
temperature + top-k/p + gumbel

合并结果
按原始顺序排列


附录 H: Triton Kernel 优化详解

H.1 _topk_log_softmax_kernel 算法

这是采样管线中最关键的 Triton kernel,在 GPU 上并行计算所有请求的 log-softmax 并提取 top-k 结果。

算法步骤:

  1. 每个 Triton program 处理一个请求
  2. 分块扫描 logits,计算最大值 (数值稳定)
  3. 分块计算 exp(logit - max) 的和
  4. 计算 log_softmax = logit - max - log(sum_exp)
  5. 选择 top-k 个最大值

性能优化:

  • 使用 BLOCK_SIZE 编译时常量,允许循环展开
  • 单个 kernel 完成所有计算,避免多次 kernel launch
  • 使用 Triton 自动向量化

Triton 单kernel

单kernel: max + sum_exp + log_softmax + top-k

1次kernel launch
约10us开销
节省75%

传统方式 多kernel

kernel1: max

kernel2: sum_exp

kernel3: log_softmax

kernel4: top-k

4次kernel launch
约40us开销


附录 I: 采样管线性能优化汇总

优化技术 文件 加速原理 效果
UvaBackedTensor gpu/buffer_utils.py CPU-GPU零拷贝 减少传输延迟
Triton kernel gpu/sample/logprob.py 单kernel完成多步 减少75% launch开销
分组采样 gpu/sample/sampler.py 避免条件分支 提高GPU利用率
Gumbel-max gpu/sample/gumbel.py 避免softmax计算 减少计算量
Grammar bitmask gpu/structured_outputs.py 并行位操作 高效token过滤
CUDA Graph gpu/cudagraph_utils.py 批量kernel提交 减少67% launch开销

附录 J: InputBatch 与 SchedulerOutput 数据映射

J.1 完整字段映射表

SchedulerOutput 字段 InputBatch 处理 更新方法
scheduled_new_reqs 创建 CachedRequestState add_request()
scheduled_cached_reqs 更新已有请求状态 _update_resumed_request()
num_scheduled_tokens 更新 num_new_tokens 直接赋值
blocks_to_swap_in CPU->GPU KV Cache拷贝 直接操作 kv_cache tensor
blocks_to_swap_out GPU->CPU KV Cache拷贝 直接操作 kv_cache tensor
blocks_to_copy GPU内KV Cache复制 直接操作 kv_cache tensor
finished_req_ids 移除请求 remove_request()
preempted_req_ids 暂停请求(保留缓存) remove_request(不删缓存)
free_encoder_mm_hashes 清除编码器缓存 encoder_cache.pop()
new_block_ids_to_zero 清零新块 _zero_block_ids()

J.2 数据映射流程

GPU 张量更新

InputBatch 处理

SchedulerOutput

scheduled_new_reqs
scheduled_cached_reqs
num_scheduled_tokens
blocks_to_swap
blocks_to_copy
finished_req_ids
preempted_req_ids

add_request()
update_request()
swap_in/out
copy_on_write
remove_request()

input_ids
positions
seq_lens
block_table
slot_mapping
sampling_params


附录 K: SamplingMetadata 构建详解

K.1 SamplingMetadata 数据结构

SamplingMetadata 是采样器所需的元数据,包含每个请求的采样参数:

字段 形状 说明
temperature Tensor [num_reqs] 温度参数
top_k Tensor [num_reqs] Top-K参数
top_p Tensor [num_reqs] Top-P参数
min_p Tensor [num_reqs] Min-P参数
seed Tensor [num_reqs] 随机种子
repetition_penalty Tensor [num_reqs] 重复惩罚
frequency_penalty Tensor [num_reqs] 频率惩罚
presence_penalty Tensor [num_reqs] 存在惩罚
logit_bias Tensor [num_reqs, vocab_size] logit偏置
sampling_type SamplingType[] [num_reqs] 采样类型

K.2 SamplingType 分类逻辑

条件 SamplingType 采样方法
temperature==0 GREEDY argmax(logits)
top_k==1 GREEDY argmax(logits)
temperature>0 RANDOM temperature + top-k/p + gumbel

K.3 SamplingMetadata 构建流程

渲染错误: Mermaid 渲染失败: Parse error on line 11: ...nalty"] D1 and D2 and D3 and D4 ---------------------^ Expecting 'SEMI', 'NEWLINE', 'EOF', 'AMP', 'START_LINK', 'LINK', 'LINK_ID', got 'NODE_STRING'

附录 L: LogprobsTensors 数据结构

L.1 LogprobsTensors 字段

LogprobsTensors 存储采样后的对数概率结果:

字段 形状 说明
token_ids [num_reqs, max_logprobs] top-k token IDs
values [num_reqs, max_logprobs] top-k log概率值
normalized bool 是否已归一化

L.2 GPU-CPU 数据转换

GPU上的LogprobsTensors需要转换为CPU端的LogprobsLists才能返回给用户:

CPU

GPU

to_cpu_nonblocking()

LogprobsTensors
token_ids and values as Tensors

LogprobsLists
dict per request: token_id to logprob


附录 M: 采样管线性能分析

M.1 采样管线延迟分解

步骤 典型延迟 占比
Penalties apply 5us 3%
Logit bias apply 2us 1%
Temperature + TopK and TopP 15us 10%
Gumbel sample 10us 7%
Grammar bitmask 20us 13%
Logprob compute (Triton) 80us 53%
Bad words filter 3us 2%
GPU-CPU copy 15us 10%
Total ~150us 100%

M.2 采样管线与模型前向传播的时间比

模型前向传播占推理步骤的90-95% (约2-5ms),采样管线占3-8% (约0.1-0.2ms)。

虽然采样管线只占总推理时间的3-8%,但在高吞吐场景下,优化采样管线可以显著提高整体QPS。

M.3 采样管线优化方向

  1. Logprob Triton kernel优化: 占53%延迟,是最大瓶颈
  2. Grammar bitmask批量化: 占13%,使用更高效的位操作
  3. GPU-CPU异步拷贝: 占10%,使用独立CUDA Stream
  4. Penalties向量化: 占3%,使用GPU并行计算

附录 N: PromptLogprob 计算详解

N.1 提示对数概率计算方式

PromptLogprob 计算提示中每个token的条件概率:

对于 prompt [t0, t1, t2, t3]:

  • logprob(t0) 不计算(无上下文)
  • logprob(t1) = log P(t1 | t0)
  • logprob(t2) = log P(t2 | t0, t1)
  • logprob(t3) = log P(t3 | t0, t1, t2)

N.2 PromptLogprob 计算流程

渲染错误: Mermaid 渲染失败: Parse error on line 8: ...t[N]]"] C1 and C2 and C3 --> D[ ---------------------^ Expecting 'SEMI', 'NEWLINE', 'EOF', 'AMP', 'START_LINK', 'LINK', 'LINK_ID', got 'NODE_STRING'

附录 O: TPU InputBatch 特殊性

O.1 TPU 与 GPU 的关键差异

TPU InputBatch 与 GPU 版本的主要差异:

特性 GPU InputBatch TPU InputBatch
计算后端 CUDA/Triton XLA/JAX
形状约束 CUDA Graph要求固定形状 XLA编译要求固定形状
内存管理 CUDA显存 TPU HBM
通信 NCCL TPU Mesh
优化kernel Triton/CUDA custom XLA编译优化

O.2 XLA 编译 vs CUDA Graph

XLA 编译

编译计算图

执行编译后图

编译时优化
更激进的融合

CUDA Graph

捕获执行图

重放固定图

运行时捕获
无需重新编译

Logo

免费领 100 小时云算力,进群参与显卡、AI PC 幸运抽奖

更多推荐