【vllm】(v1 Worker)vLLM V1 Worker—Part 2输入批处理与采样系统
vLLM V1 Worker 超深度架构分析 — Part 2: 输入批处理与采样系统
vLLM V1 Worker 超深度架构分析 — Part 2: 输入批处理与采样系统
分析目标目录:
github.com/vllm/vllm/v1/worker
本卷聚焦: InputBatch 批次管理 | 采样管线 | Logprobs | 微批次调度
目录
- 输入批处理系统总览
- CachedRequestState 请求缓存状态
- InputBatch (顶层) 深度分析
- GPU InputBatch 深度分析
- TPU InputBatch 深度分析
- 微批次调度系统
- UBatchWrapper 微批次包装器
- 采样管线总览
- Sampler 采样器核心
- SamplingStates 采样状态
- Penalties 惩罚系统
- Logprob 对数概率计算
- PromptLogprob 提示对数概率
- LogitBias 偏置应用
- MinP / Gumbel 采样
- BadWords 坏词过滤
- StructuredOutputs 结构化输出
一、输入批处理系统总览
1.1 系统定位
输入批处理系统是 Worker 推理管线中调度器输出与模型执行之间的桥梁。它负责:
| 职责 | 说明 |
|---|---|
| 请求状态管理 | 缓存每个请求的采样参数、token 历史、LoRA 配置等 |
| 输入张量组装 | 将多个请求的 token 拼装成连续的 input_ids/positions 张量 |
| KV Cache 映射 | 维护 slot_mapping 和 block_table 映射关系 |
| 注意力元数据 | 构建 seq_lens、query_start_loc 等注意力计算所需的元数据 |
| 微批次拆分 | 将大批次拆分为多个微批次,优化 GPU 利用率 |
| 采样参数准备 | 为采样器准备 temperature、top_k、top_p 等参数张量 |
1.2 批处理架构图
1.3 数据流图
二、CachedRequestState 请求缓存状态
2.1 数据类定义
文件: gpu_input_batch.py (前部分)
@dataclass
class CachedRequestState:
"""缓存请求状态 - 存储单个请求的所有运行时状态。
每个活跃请求都有一个对应的 CachedRequestState 实例,
在请求的生命周期内持续更新。
设计目的:
- 避免每步都从调度器重新构建请求状态
- 缓存不变的状态(如 sampling_params),只更新变化的部分
- 为输入张量组装提供快速访问
"""
# === 基础请求信息 ===
request_id: str # 请求唯一标识符
prompt_token_ids: tuple[int, ...] # 提示 token IDs(不可变)
prompt_embeds: Optional[torch.Tensor] = None # 提示嵌入(可选,替代 token IDs)
# === 采样参数 ===
sampling_params: SamplingParams # 采样参数(temperature, top_k, top_p 等)
pooling_params: Optional[PoolingParams] = None # 池化参数(pooling 模型用)
# === 运行时状态 ===
num_computed_tokens: int = 0 # 已计算的 token 数(包括 prompt + generated)
block_ids: tuple[int, ...] = () # 分配的 KV Cache 物理块 ID 列表
num_new_tokens: int = 0 # 本步新调度的 token 数
# === LoRA 状态 ===
lora_request: Optional[LoRARequest] = None # LoRA 适配器请求
# === 多模态状态 ===
mm_feature_spec: Optional[MultiModalFeatureSpec] = None # 多模态特征规格
# === 输出状态 ===
output_token_ids: list[int] = field(default_factory=list) # 已生成的输出 token IDs
# === 推理模式 ===
is_floating_req: bool = False # 是否为"浮动"请求(预填充未完成)
2.2 CachedRequestState 生命周期
2.3 CachedRequestState 与 InputBatch 的关系
三、InputBatch (顶层) 深度分析
3.1 概述
文件: gpu_input_batch.py (1121行)
InputBatch 是顶层批次管理器,负责:
- 管理请求的增删改(add/remove/update)
- 维护 GPU 输入缓冲区(input_ids, positions, seq_lens 等)
- 管理块表映射(BlockTable)
- 构建注意力元数据所需的辅助信息
- 为采样器准备采样参数
3.2 类定义
class InputBatch:
"""输入批次 - 管理推理步骤的输入数据。
核心数据结构:
- requests: 请求数组 [max_num_reqs],存储 CachedRequestState
- request_ids_to_indices: 请求 ID 到批次索引的映射
- block_table: BlockTable 块表实例
GPU 缓冲区:
- input_ids: [max_num_tokens] 输入 token IDs
- positions: [max_num_tokens] 位置编码
- seq_lens: [max_num_reqs] 序列长度
- query_start_loc: [max_num_reqs + 1] 查询起始位置
设计理念:
- 预分配固定大小的 GPU 缓冲区,避免动态分配
- 使用 numpy 在 CPU 上构建,然后拷贝到 GPU
- 支持 CUDA Graph(固定大小张量)
"""
def __init__(
self,
vllm_config: VllmConfig,
device: torch.device,
):
# 配置参数
self.vllm_config = vllm_config
self.model_config = vllm_config.model_config
self.cache_config = vllm_config.cache_config
self.scheduler_config = vllm_config.scheduler_config
self.parallel_config = vllm_config.parallel_config
self.device = device
# 批次容量
self.max_num_reqs = scheduler_config.max_num_seqs
self.max_num_tokens = scheduler_config.max_num_batched_tokens
self.max_model_len = model_config.max_model_len
# 块大小
self.block_size = cache_config.block_size
self.max_num_blocks_per_req = cdiv(self.max_model_len, self.block_size)
# === 请求数据 ===
# 请求数组 - 预分配,运行时按需使用
self.requests: list[Optional[CachedRequestState]] = [None] * self.max_num_reqs
# 请求 ID -> 批次索引映射
self.request_ids_to_indices: dict[str, int] = {}
# 当前批次中的请求数
self.num_reqs = 0
# === GPU 输入缓冲区 ===
# 输入 token IDs - 包含所有请求的新 token
self.input_ids = torch.zeros(
self.max_num_tokens, dtype=torch.int32, device=device
)
# 位置编码 - 每个 token 在序列中的位置
self.positions = torch.zeros(
self.max_num_tokens, dtype=torch.int64, device=device
)
# 序列长度 - 每个请求的当前序列长度(含已生成的 token)
self.seq_lens = torch.zeros(
self.max_num_reqs, dtype=torch.int32, device=device
)
# 查询起始位置 - 用于标记每个请求的 token 在 input_ids 中的范围
# query_start_loc[i] = 第 i 个请求的 token 起始位置
# query_start_loc[i+1] - query_start_loc[i] = 第 i 个请求的 token 数
self.query_start_loc = torch.zeros(
self.max_num_reqs + 1, dtype=torch.int32, device=device
)
# === 块表 ===
self.block_table = BlockTable(
max_num_reqs=self.max_num_reqs,
max_num_blocks_per_req=self.max_num_blocks_per_req,
block_size=self.block_size,
pin_memory=True,
)
3.3 add_request() - 添加新请求
def add_request(
self,
request_id: str, # 请求 ID
prompt_token_ids: tuple[int, ...], # 提示 token IDs
sampling_params: SamplingParams, # 采样参数
block_ids: tuple[int, ...], # 分配的物理块 ID
num_computed_tokens: int, # 已计算的 token 数
lora_request: Optional[LoRARequest] = None, # LoRA 请求
pooling_params: Optional[PoolingParams] = None, # 池化参数
mm_feature_spec: Optional[MultiModalFeatureSpec] = None, # 多模态规格
) -> None:
"""将新请求添加到批次中。
执行步骤:
1. 找到空闲的批次索引
2. 创建 CachedRequestState
3. 更新块表
4. 注册请求 ID -> 索引映射
注意: 此方法只更新状态,不更新 GPU 缓冲区
GPU 缓冲区在 _update_inputs() 中统一更新
"""
# 步骤1: 分配批次索引
# 如果有已完成的请求留下的空位,优先使用
req_index = self.num_reqs # 简化: 使用下一个空位
# 步骤2: 创建缓存状态
self.requests[req_index] = CachedRequestState(
request_id=request_id,
prompt_token_ids=prompt_token_ids,
sampling_params=sampling_params,
block_ids=block_ids,
num_computed_tokens=num_computed_tokens,
lora_request=lora_request,
pooling_params=pooling_params,
mm_feature_spec=mm_feature_spec,
)
# 步骤3: 更新块表
self.block_table.add_row(req_index, list(block_ids))
# 步骤4: 注册映射
self.request_ids_to_indices[request_id] = req_index
self.num_reqs += 1
3.4 remove_request() - 移除完成的请求
def remove_request(self, request_id: str) -> None:
"""移除已完成的请求。
执行步骤:
1. 查找请求索引
2. 清除块表行
3. 清除缓存状态
4. 更新映射
5. 压缩批次(可选)
"""
req_index = self.request_ids_to_indices.pop(request_id)
self.requests[req_index] = None
self.block_table.remove_row(req_index)
self.num_reqs -= 1
# 如果不是最后一个请求,需要压缩
# 将最后一个请求移动到空位
if req_index < self.num_reqs:
last_index = self.num_reqs
self.requests[req_index] = self.requests[last_index]
self.block_table.move_row(last_index, req_index)
# 更新映射
moved_req = self.requests[req_index]
self.request_ids_to_indices[moved_req.request_id] = req_index
self.requests[last_index] = None
3.5 update() - 更新批次状态
def update(
self,
scheduler_output: SchedulerOutput,
) -> None:
"""根据调度器输出更新批次状态。
这是每个推理步骤的核心更新方法。
执行步骤:
1. 处理新请求 (add_request)
2. 处理恢复请求 (resume_request)
3. 处理 KV Cache swap/copy 操作
4. 处理完成的请求 (remove_request)
5. 更新已计算 token 数
6. 更新 GPU 缓冲区
"""
# 步骤1: 处理新请求
for new_req_data in scheduler_output.scheduled_new_reqs:
self.add_request(
request_id=new_req_data.request_id,
prompt_token_ids=new_req_data.prompt_token_ids,
sampling_params=new_req_data.sampling_params,
block_ids=new_req_data.block_ids,
num_computed_tokens=new_req_data.num_computed_tokens,
...
)
# 步骤2: 处理恢复请求(从 preemption 恢复)
for resumed_req in scheduler_output.scheduled_cached_reqs:
req_index = self.request_ids_to_indices[resumed_req.request_id]
# 更新块表和计算 token 数
self._update_resumed_request(req_index, resumed_req)
# 步骤3: 处理 KV Cache 操作
# swap in: CPU -> GPU
for swap_in_op in scheduler_output.blocks_to_swap_in:
...
# swap out: GPU -> CPU
for swap_out_op in scheduler_output.blocks_to_swap_out:
...
# copy-on-write: 共享块需要时复制
for copy_op in scheduler_output.blocks_to_copy:
...
# 步骤4: 移除完成的请求
for finished_req_id in scheduler_output.finished_req_ids:
self.remove_request(finished_req_id)
# 步骤5: 更新 GPU 输入缓冲区
self._update_inputs()
3.6 _update_inputs() - 更新 GPU 输入张量
def _update_inputs(self) -> None:
"""更新所有 GPU 输入缓冲区。
将 CPU 上的请求状态数据转换为 GPU 张量。
执行步骤:
1. 计算每个请求的新 token 及其位置
2. 填充 input_ids 和 positions
3. 更新 seq_lens 和 query_start_loc
4. 提交块表到 GPU
5. 构建采样参数张量
"""
offset = 0 # token 偏移量
for i in range(self.num_reqs):
req = self.requests[i]
if req is None:
continue
# 计算新 token 的范围
# 新 token = 从 num_computed_tokens 开始的 num_new_tokens 个 token
start = req.num_computed_tokens
num_new = req.num_new_tokens
# 填充 input_ids
if req.prompt_embeds is not None:
# 使用嵌入而非 token IDs
self.input_ids[offset:offset + num_new] = 0 # placeholder
else:
# 使用 token IDs
if start < len(req.prompt_token_ids):
# 预填充阶段: 取 prompt token
ids = req.prompt_token_ids[start:start + num_new]
else:
# 解码阶段: 取上一步生成的 token
gen_start = start - len(req.prompt_token_ids)
ids = req.output_token_ids[gen_start:gen_start + num_new]
self.input_ids[offset:offset + num_new] = torch.tensor(ids, dtype=torch.int32)
# 填充 positions
self.positions[offset:offset + num_new] = torch.arange(
start, start + num_new, dtype=torch.int64
)
# 更新 seq_lens
self.seq_lens[i] = start + num_new
# 更新 query_start_loc
self.query_start_loc[i + 1] = self.query_start_loc[i] + num_new
offset += num_new
# 提交块表到 GPU
self.block_table.commit(self.num_reqs)
3.7 InputBatch 操作流程图
3.8 input_ids / positions / seq_lens 布局示例
四、GPU InputBatch 深度分析
4.1 概述
文件: gpu/input_batch.py (588行)
GPU InputBatch 是 GPU 优化的批次实现,使用 Triton kernel 和 CUDA 优化的张量操作来构建输入数据。
4.2 InputBuffers 类
class InputBuffers:
"""GPU 输入缓冲区 - 预分配所有输入张量。
与顶层 InputBatch 的区别:
- 更底层,直接管理 GPU 张量
- 使用 Triton kernel 进行数据填充(GPU 端完成,无需 CPU-GPU 传输)
- 支持 CUDA Graph(固定大小张量)
缓冲区列表:
- input_ids: [max_num_tokens] 输入 token IDs
- positions: [max_num_tokens] 位置编码
- query_start_loc: [max_num_reqs+1] 查询起始位置
- seq_lens: [max_num_reqs] 序列长度
- dcp_local_seq_lens: [max_num_reqs] DCP 本地序列长度
"""
def __init__(
self,
max_num_reqs: int,
max_num_tokens: int,
device: torch.device,
):
self.max_num_reqs = max_num_reqs
self.max_num_tokens = max_num_tokens
self.device = device
# 预分配所有缓冲区
self.input_ids = torch.zeros(max_num_tokens, dtype=torch.int32, device=device)
self.positions = torch.zeros(max_num_tokens, dtype=torch.int64, device=device)
self.query_start_loc = torch.zeros(
max_num_reqs + 1, dtype=torch.int32, device=device
)
self.seq_lens = torch.zeros(max_num_reqs, dtype=torch.int32, device=device)
# DCP: per-request local seq_lens buffer
self.dcp_local_seq_lens = torch.zeros(
max_num_reqs, dtype=torch.int32, device=device
)
4.3 GPU InputBatch 类
class InputBatch:
"""GPU 优化的输入批次管理。
核心优化:
1. 使用 Triton kernel 在 GPU 端构建输入(避免 CPU-GPU 数据传输)
2. 预分配固定大小缓冲区(支持 CUDA Graph)
3. 使用 UvaBackedTensor 实现 CPU-GPU 统一虚拟地址访问
"""
def __init__(
self,
max_num_reqs: int,
max_num_tokens: int,
...
):
self.buffers = InputBuffers(max_num_reqs, max_num_tokens, device)
# ... 其他初始化
4.4 GPU vs 顶层 InputBatch 对比
五、TPU InputBatch 深度分析
5.1 概述
文件: tpu_input_batch.py (574行)
TPU InputBatch 是为 Google TPU 后端定制的批次管理实现。
5.2 TPU 与 GPU 的关键差异
class TPUInputBatch:
"""TPU 输入批次 - 为 TPU 后端定制。
TPU 特殊性:
- 使用 JAX/PyTorch XLA 而非 CUDA
- 编译时确定形状(XLA 编译需要静态形状)
- 无 CUDA Graph(使用 XLA 编译代替)
- 使用 TPU 特定的内存布局
"""
5.3 TPU vs GPU InputBatch 对比
六、微批次调度系统
6.1 概述
微批次(Micro-batching / UBatch)系统将大批次拆分为多个小批次,目的是:
- 提高 GPU 利用率 - 避免大批次导致的显存溢出
- 改善延迟 - 小批次更快完成,减少队头阻塞
- 支持 prefill-decode 分离 - 将 prefill 和 decode 拆开执行
6.2 ubatch_utils.py 分析
文件: ubatch_utils.py (265行)
"""
微批次工具 - 提供微批次的创建、拆分和合并功能。
核心数据结构:
- UBatchSlices: 描述一个微批次在原始批次中的切片范围
"""
@dataclass
class UBatchSlices:
"""微批次切片 - 描述微批次在原始批次中的范围。
属性:
req_start: 请求起始索引
req_end: 请求结束索引(不含)
token_start: token 起始索引
token_end: token 结束索引(不含)
"""
req_start: int
req_end: int
token_start: int
token_end: int
def check_ubatch_thresholds(
num_reqs: int,
num_tokens: int,
max_ubatch_size: int,
max_ubatch_tokens: int,
) -> bool:
"""检查是否需要拆分微批次。
判断条件:
- num_reqs > max_ubatch_size → 需要拆分
- num_tokens > max_ubatch_tokens → 需要拆分
Returns:
True 表示需要拆分
"""
return num_reqs > max_ubatch_size or num_tokens > max_ubatch_tokens
def maybe_create_ubatch_slices(
num_reqs: int,
num_tokens: int,
query_start_loc: torch.Tensor,
max_ubatch_size: int,
max_ubatch_tokens: int,
) -> list[UBatchSlices] | None:
"""尝试创建微批次切片。
如果不需要拆分,返回 None。
如果需要拆分,返回切片列表。
拆分策略:
1. 尽量保持请求的完整性(不拆分单个请求)
2. 优先满足 max_ubatch_tokens 约束
3. 在满足 token 约束的前提下尽量填满 max_ubatch_size
"""
if not check_ubatch_thresholds(num_reqs, num_tokens, ...):
return None
slices = []
req_start = 0
while req_start < num_reqs:
# 找到满足约束的最大请求范围
req_end = _find_ubatch_boundary(
req_start, num_reqs, query_start_loc, ...
)
slices.append(UBatchSlices(
req_start=req_start,
req_end=req_end,
token_start=query_start_loc[req_start].item(),
token_end=query_start_loc[req_end].item(),
))
req_start = req_end
return slices
def split_attn_metadata(
attn_metadata: AttentionMetadata,
ubatch_slices: list[UBatchSlices],
) -> list[AttentionMetadata]:
"""将注意力元数据按微批次切片拆分。
每个微批次需要独立的注意力元数据,
因为注意力计算是按批次独立执行的。
"""
6.3 ubatching.py 分析
文件: ubatching.py (241行)
"""
微批次调度逻辑 - 决定如何将批次拆分为微批次。
调度策略:
1. Prefill-Decode 分离: 将 prefill 和 decode 请求分别组批
2. 按大小排序: 将相似大小的请求组在一起
3. 自适应拆分: 根据当前 GPU 负载动态调整微批次大小
"""
6.4 微批次调度流程
6.5 微批次拆分示意
七、UBatchWrapper 微批次包装器
7.1 概述
文件: gpu_ubatch_wrapper.py (510行)
UBatchWrapper 是微批次的执行包装器,负责:
- 管理微批次的输入/输出张量切片
- 调用模型前向传播和采样
- 将微批次结果合并到完整批次
7.2 核心方法
class UBatchWrapper:
"""微批次包装器 - 封装微批次的执行逻辑。"""
def __init__(
self,
model_runner: GPUModelRunner,
...
):
self.model_runner = model_runner
def execute_ubatch(
self,
ubatch_slices: UBatchSlices,
input_batch: InputBatch,
...
) -> ModelRunnerOutput:
"""执行一个微批次的推理。
步骤:
1. 切分输入张量
2. 构建微批次的注意力元数据
3. 执行前向传播
4. 执行采样
5. 返回结果
"""
# 1. 切分输入
input_ids = input_batch.input_ids[
ubatch_slices.token_start:ubatch_slices.token_end
]
positions = input_batch.positions[
ubatch_slices.token_start:ubatch_slices.token_end
]
# ... 其他张量切分
# 2. 构建微批次注意力元数据
ubatch_attn_metadata = split_attn_metadata(...)
# 3. 前向传播
hidden_states = self.model_runner.model.forward(
input_ids=input_ids,
positions=positions,
kv_cache=kv_cache,
attn_metadata=ubatch_attn_metadata,
)
# 4. 采样
output = self.model_runner.sampler(...)
return output
7.3 UBatchWrapper 执行流程
八、采样管线总览
8.1 采样管线架构
采样管线是模型前向传播之后的处理阶段,将 logits 转换为最终的 token ID。
8.2 采样管线类图
8.3 采样管线执行流程
九、Sampler 采样器核心
9.1 概述
文件: gpu/sample/sampler.py (197行)
Sampler 是采样管线的入口和协调者,将各采样组件串联执行。
9.2 源码分析
class Sampler:
"""采样器 - 协调所有采样组件的执行。
采样流程:
1. 应用惩罚(重复/频率/存在)
2. 应用 logit 偏置
3. 应用温度缩放
4. 应用 top-k / top-p 过滤
5. 执行采样(贪心/随机/Gumbel)
6. 应用坏词过滤
7. 应用语法约束
8. 计算对数概率
"""
def __init__(
self,
max_num_reqs: int, # 最大请求数
vocab_size: int, # 词汇表大小
logprobs_mode: LogprobsMode, # 对数概率模式
):
self.max_num_reqs = max_num_reqs
self.vocab_size = vocab_size
self.logprobs_mode = logprobs_mode
# 初始化各采样组件
self.states = SamplingStates(max_num_reqs, vocab_size)
self.penalties = PenaltiesState(...)
self.logit_bias = LogitBiasState(...)
self.bad_words = BadWordsState(...)
self.logprob_token_ids = LogprobTokenIdsState(...)
def init_gpu_tensors(self, device: torch.device) -> None:
"""初始化 GPU 张量。"""
self.states.init_gpu_tensors(device)
self.penalties.init_gpu_tensors(device)
self.logit_bias.init_gpu_tensors(device)
def update_state(self, input_batch) -> None:
"""更新采样状态。
从 InputBatch 中提取各请求的采样参数,
更新到 GPU 张量中。
"""
self.states.update(input_batch)
self.penalties.update(input_batch)
self.logit_bias.update(input_batch)
self.bad_words.update(input_batch)
def forward(
self,
logits: torch.Tensor, # [num_reqs, vocab_size]
sampling_metadata: SamplingMetadata, # 采样元数据
) -> SamplerOutput:
"""执行采样。
Args:
logits: 模型输出的原始 logits
sampling_metadata: 采样参数元数据
Returns:
SamplerOutput: 包含 sampled_token_ids 和 logprobs
"""
# 步骤1: 应用惩罚
logits = self.penalties.apply(logits, ...)
# 步骤2: 应用 logit 偏置
logits = self.logit_bias.apply(logits, ...)
# 步骤3: 分类采样 - 根据采样类型分组
# 贪心采样组
greedy_mask = self.states.is_greedy[:num_reqs]
# 随机采样组
random_mask = self.states.is_random[:num_reqs]
# 步骤4: 贪心采样
if greedy_mask.any():
greedy_token_ids = torch.argmax(logits[greedy_mask], dim=-1)
# 步骤5: 随机采样
if random_mask.any():
# 应用温度
temp = self.states.temperature[:num_reqs][random_mask]
logits[random_mask] = logits[random_mask] / temp.unsqueeze(-1)
# 应用 top-k / top-p
logits[random_mask] = apply_top_k_top_p(
logits[random_mask],
self.states.top_k[:num_reqs][random_mask],
self.states.top_p[:num_reqs][random_mask],
)
# 应用 min-p
logits[random_mask] = apply_min_p(
logits[random_mask],
self.states.min_p[:num_reqs][random_mask],
)
# 采样
random_token_ids = gumbel_sample(
logits[random_mask],
self.states.seed[:num_reqs][random_mask],
)
# 步骤6: 合并结果
sampled_token_ids = torch.empty(num_reqs, dtype=torch.int32)
sampled_token_ids[greedy_mask] = greedy_token_ids
sampled_token_ids[random_mask] = random_token_ids
# 步骤7: 坏词过滤
sampled_token_ids = self.bad_words.apply(sampled_token_ids, ...)
# 步骤8: 计算对数概率
logprobs = self.logprob_token_ids.compute_topk_logprobs(logits, ...)
return SamplerOutput(
sampled_token_ids=sampled_token_ids,
logprob_token_ids=logprobs.token_ids,
logprob_values=logprobs.values,
)
9.3 Sampler 采样流程详解
十、SamplingStates 采样状态
10.1 概述
文件: gpu/sample/states.py (104行)
SamplingStates 管理所有请求的采样参数的 GPU 张量表示。
10.2 源码分析
NO_LOGPROBS = -1 # 不计算 logprobs 的标记
class SamplingStates:
"""采样状态 - 存储所有请求的采样参数的 GPU 张量。
每个采样参数都是一个 GPU 张量 [max_num_reqs],
索引对应请求在批次中的位置。
使用 UvaBackedTensor 实现:
- CPU 和 GPU 共享同一物理内存(统一虚拟地址)
- 避免显式的 CPU->GPU 拷贝
- CPU 端写入立即可在 GPU 端可见
"""
def __init__(self, max_num_reqs: int, vocab_size: int):
self.max_num_reqs = max_num_reqs
self.vocab_size = vocab_size
# === 采样参数张量 ===
# 温度 - 控制采样的随机性
# temperature=0: 贪心, temperature=1: 标准随机, temperature>1: 更随机
self.temperature = UvaBackedTensor(max_num_reqs, dtype=torch.float32)
# Top-K - 只保留概率最高的 K 个 token
# top_k=1: 贪心, top_k=vocab_size: 不过滤
self.top_k = UvaBackedTensor(max_num_reqs, dtype=torch.int32)
# Top-P (nucleus sampling) - 累积概率阈值
# top_p=0.9: 保留累积概率前 90% 的 token
self.top_p = UvaBackedTensor(max_num_reqs, dtype=torch.float32)
# Min-P - 最小概率阈值
# 过滤掉概率低于最高概率 token 的 min_p 倍的 token
self.min_p = UvaBackedTensor(max_num_reqs, dtype=torch.float32)
# 随机种子 - 用于确定性随机采样
self.seed = UvaBackedTensor(max_num_reqs, dtype=torch.int64)
# === 分类掩码 (numpy) ===
# 是否为贪心采样
self.is_greedy = np.zeros(max_num_reqs, dtype=bool)
# 是否为随机采样
self.is_random = np.zeros(max_num_reqs, dtype=bool)
10.3 UvaBackedTensor 原理
十一、Penalties 惩罚系统
11.1 概述
文件: gpu/sample/penalties.py (310行)
Penalties 系统实现了三种采样惩罚机制,用于控制模型生成内容的重复性。
11.2 三种惩罚类型
| 惩罚类型 | 公式 | 作用 |
|---|---|---|
| Repetition Penalty | logit_i *= penalty (if logit_i > 0)logit_i /= penalty (if logit_i < 0) |
降低已出现 token 的概率 |
| Frequency Penalty | logit_i -= frequency * count(token_i) |
按出现次数线性降低概率 |
| Presence Penalty | logit_i -= presence (if count(token_i) > 0) |
只要出现过就降低概率 |
11.3 源码分析
class PenaltiesState:
"""惩罚状态 - 管理各请求的惩罚参数和计算。
惩罚机制的核心思想:
- 已生成的 token 不应该以过高概率再次生成
- 不同的惩罚策略适用于不同的场景
- Repetition: 适用于需要避免任何重复的场景
- Frequency: 适用于需要渐进降低重复概率的场景
- Presence: 适用于需要鼓励多样性的场景
"""
def __init__(self, req_states: RequestState):
self.req_states = req_states
max_num_reqs = req_states.max_num_reqs
self.vocab_size = req_states.vocab_size
self.device = req_states.device
# 惩罚参数张量
self.repetition_penalty = UvaBackedTensor(max_num_reqs, dtype=torch.float32)
self.frequency_penalty = UvaBackedTensor(max_num_reqs, dtype=torch.float32)
self.presence_penalty = UvaBackedTensor(max_num_reqs, dtype=torch.float32)
# 是否使用惩罚的标记
self.use_penalty = np.zeros(max_num_reqs, dtype=bool)
def apply(
self,
logits: torch.Tensor, # [num_reqs, vocab_size]
...
) -> torch.Tensor:
"""应用惩罚到 logits。
执行步骤:
1. 构建 token 出现次数矩阵
2. 应用 repetition penalty
3. 应用 frequency penalty
4. 应用 presence penalty
"""
if not self.use_penalty.any():
return logits # 无需惩罚,直接返回
# 构建 token 出现频率矩阵
# frequency_matrix[req_idx, token_id] = token_id 出现的次数
frequency_matrix = self._build_frequency_matrix(...)
# 应用各惩罚
for i in range(num_reqs):
if not self.use_penalty[i]:
continue
# Repetition Penalty
rep_penalty = self.repetition_penalty[i]
if rep_penalty != 1.0:
# 正 logit 乘以惩罚,负 logit 除以惩罚
positive_mask = logits[i] > 0
logits[i][positive_mask] *= rep_penalty
logits[i][~positive_mask] /= rep_penalty
# Frequency Penalty
freq_penalty = self.frequency_penalty[i]
if freq_penalty != 0.0:
logits[i] -= freq_penalty * frequency_matrix[i]
# Presence Penalty
pres_penalty = self.presence_penalty[i]
if pres_penalty != 0.0:
logits[i] -= pres_penalty * (frequency_matrix[i] > 0).float()
return logits
11.4 惩罚效果可视化
十二、Logprob 对数概率计算
12.1 概述
文件: gpu/sample/logprob.py (250行)
Logprob 模块计算采样 token 的对数概率,用于:
- 返回 top-k 个 token 的概率分布
- 调试和监控模型输出
- 支持推测解码的验证
12.2 Triton Kernel 实现
@triton.jit
def _topk_log_softmax_kernel(
output_ptr, # 输出: top-k log softmax 值
logits_ptr, # 输入: logits
logits_stride, # logits 的行步长
topk_ids_ptr, # 输出: top-k token IDs
topk, # top-k 值
vocab_size, # 词汇表大小
BLOCK_SIZE: tl.constexpr, # Triton 块大小(编译时常量)
PADDED_TOPK: tl.constexpr, # 填充后的 top-k 大小
):
"""Triton kernel: 计算 log-softmax 并提取 top-k 结果。
算法:
1. 计算 logits 的最大值(数值稳定)
2. 计算 log-softmax = log(exp(logit - max) / sum(exp(logit - max)))
3. 选择 top-k 个最大的 log-softmax 值
4. 写出结果
优化:
- 使用 Triton 自动向量化
- BLOCK_SIZE 编译时常量允许循环展开
- 单个 kernel 完成所有计算,避免多次 kernel launch
"""
req_idx = tl.program_id(0) # 请求索引
row_ptr = logits_ptr + req_idx * logits_stride
# 步骤1: 计算 logits 的最大值(用于数值稳定)
max_val = float("-inf")
for i in range(0, vocab_size, BLOCK_SIZE):
block = i + tl.arange(0, BLOCK_SIZE)
logits = tl.load(row_ptr + block, mask=block < vocab_size, other=float("-inf"))
max_val = tl.max(tl.maximum(logits, max_val))
max_val = max_val.to(tl.float32)
# 步骤2: 计算 log-softmax
# log_softmax = logit - max - log(sum(exp(logit - max)))
sum_exp = 0.0
for i in range(0, vocab_size, BLOCK_SIZE):
block = i + tl.arange(0, BLOCK_SIZE)
logits = tl.load(row_ptr + block, mask=block < vocab_size, other=float("-inf"))
sum_exp += tl.sum(tl.exp(logits - max_val))
log_sum_exp = tl.log(sum_exp)
# 步骤3: 输出所有 log-softmax 值
for i in range(0, vocab_size, BLOCK_SIZE):
block = i + tl.arange(0, BLOCK_SIZE)
logits = tl.load(row_ptr + block, mask=block < vocab_size, other=float("-inf"))
log_softmax = logits - max_val - log_sum_exp
tl.store(output_ptr + req_idx * vocab_size + block, log_softmax,
mask=block < vocab_size)
12.3 Logprob 计算流程
12.4 LogprobTokenIdsState
class LogprobTokenIdsState:
"""Logprob token IDs 状态 - 管理 logprob 计算所需的缓冲区。"""
def __init__(self, max_num_reqs, vocab_size, logprobs_mode):
self.max_num_reqs = max_num_reqs
self.vocab_size = vocab_size
# 请求的 top-k 数量
self.num_topk = np.zeros(max_num_reqs, dtype=np.int32)
def compute_topk_logprobs(
self,
logits: torch.Tensor,
...
) -> LogprobsTensors:
"""计算 top-k logprobs。
Returns:
LogprobsTensors: 包含 token_ids 和 logprob 值
"""
十三、PromptLogprob 提示对数概率
13.1 概述
文件: gpu/sample/prompt_logprob.py (236行)
PromptLogprob 计算提示(prompt)中每个 token 的对数概率,用于评估模型对提示的理解程度。
13.2 特殊性
- 提示 logprob 是在预填充阶段计算的
- 每个提示 token 的 logprob 是条件概率 P(token_i | token_0, …, token_{i-1})
- 用于评估提示的"可预测性"
13.3 计算方式
def compute_prompt_logprobs(
logits: torch.Tensor, # [num_prompt_tokens, vocab_size]
prompt_token_ids: torch.Tensor, # [num_prompt_tokens]
...
) -> LogprobsTensors:
"""计算提示中每个 token 的对数概率。
对于 prompt [t0, t1, t2, t3]:
- logprob(t0) = 不计算(无上下文)
- logprob(t1) = log P(t1 | t0)
- logprob(t2) = log P(t2 | t0, t1)
- logprob(t3) = log P(t3 | t0, t1, t2)
使用 gather 操作高效提取:
logprobs[i] = log_softmax(logits[i])[prompt_token_ids[i+1]]
"""
十四、LogitBias 偏置应用
14.1 概述
文件: gpu/sample/logit_bias.py (280行)
LogitBias 允许用户指定特定 token 的偏置值,在采样前添加到 logits 上。
14.2 源码分析
class LogitBiasState:
"""Logit 偏置状态 - 管理各请求的 logit 偏置。
用例:
- 提高/降低特定 token 的生成概率
- 实现自定义的 token 过滤
- 支持结构化生成的偏置引导
数据格式:
- logit_bias: [num_reqs, vocab_size] 偏置张量
- has_logit_bias: [num_reqs] bool 标记
"""
def __init__(self, ...):
self.has_logit_bias = np.zeros(max_num_reqs, dtype=bool)
# 稀疏存储: 只存储有偏置的 token
self.logit_bias = torch.zeros(...) # GPU 张量
def apply(
self,
logits: torch.Tensor,
) -> torch.Tensor:
"""应用 logit 偏置。"""
if not self.has_logit_bias.any():
return logits
for i in range(num_reqs):
if self.has_logit_bias[i]:
logits[i] += self.logit_bias[i]
return logits
十五、MinP / Gumbel 采样
15.1 Min-P 采样
文件: gpu/sample/min_p.py (60行)
"""
Min-P 采样 - 最小概率阈值过滤。
原理:
1. 找到概率最高的 token,设其概率为 P_max
2. 设定阈值 min_p
3. 过滤掉所有概率 < P_max * min_p 的 token
4. 在剩余 token 中按概率采样
优势(相比 top-p):
- 自适应阈值: 阈值随最高概率动态调整
- 在高置信度时更贪心,低置信度时更多样
"""
15.2 Gumbel 采样
文件: gpu/sample/gumbel.py (211行)
"""
Gumbel 采样 - 使用 Gumbel-max 技巧进行高效采样。
Gumbel-Max 技巧:
1. 对每个 logit 添加 Gumbel 噪声: gumbel_logits = logits + Gumbel(0, 1)
2. 取 argmax(gumbel_logits)
数学等价性:
- argmax(logits + Gumbel) 等价于按 softmax(logits) 概率采样
- 但避免了计算 softmax 的开销
- 只需要一次 argmax 操作
优势:
- 计算效率高: 不需要 softmax + CDF 采样
- 数值稳定: Gumbel 噪声是加性的
- 可重现: 使用确定性种子
"""
def gumbel_sample(
logits: torch.Tensor, # [num_reqs, vocab_size]
seeds: torch.Tensor, # [num_reqs] 随机种子
) -> torch.Tensor:
"""使用 Gumbel-max 技巧采样。
步骤:
1. 使用种子生成 Gumbel(0,1) 噪声
2. 添加噪声到 logits
3. 取 argmax
Returns:
sampled_token_ids: [num_reqs]
"""
# 生成 Gumbel 噪声
# Gumbel(0,1) = -log(-log(Uniform(0,1)))
uniform = torch.rand_like(logits)
gumbel_noise = -torch.log(-torch.log(uniform + 1e-6) + 1e-6)
# 添加噪声并 argmax
gumbel_logits = logits + gumbel_noise
return torch.argmax(gumbel_logits, dim=-1)
15.3 温度缩放
def apply_temperature(
logits: torch.Tensor, # [num_reqs, vocab_size]
temperature: torch.Tensor, # [num_reqs]
) -> torch.Tensor:
"""应用温度缩放到 logits。
公式: scaled_logits = logits / temperature
效果:
- temperature < 1: 更确定(峰值更尖锐)
- temperature = 1: 不变
- temperature > 1: 更随机(分布更平坦)
"""
return logits / temperature.unsqueeze(-1)
15.4 采样方法对比
十六、BadWords 坏词过滤
16.1 概述
文件: gpu/sample/bad_words.py (194行)
BadWords 过滤模块防止模型生成用户指定的"坏词"。
16.2 实现原理
class BadWordsState:
"""坏词状态 - 管理坏词列表和过滤逻辑。
过滤方式:
- 将坏词的 token ID 在 logits 中设为 -inf
- 确保采样时不会选中这些 token
注意: 坏词可能是多 token 序列
- 例如 "bad word" = [token_bad, token_word]
- 只在已生成前缀匹配时过滤后续 token
"""
def __init__(self, ...):
# 坏词 token IDs 列表
self.bad_words_token_ids = ...
def apply(
self,
logits: torch.Tensor,
...
) -> torch.Tensor:
"""应用坏词过滤。"""
# 将坏词 token 的 logit 设为 -inf
for token_id in self.bad_words_token_ids:
logits[:, token_id] = float("-inf")
return logits
十七、StructuredOutputs 结构化输出
17.1 概述
文件: gpu/structured_outputs.py (115行)
StructuredOutputs 模块支持语法约束的采样,确保输出符合指定格式(如 JSON Schema、正则表达式)。
17.2 实现原理
"""
结构化输出 - 语法约束采样。
支持的语法引擎:
1. xGrammar: 基于上下文无关文法(CFG)
2. LALG: 基于正则表达式
3. JSON Schema: 基于JSON Schema规范
实现方式:
- 语法引擎生成一个 bitmask [num_reqs, vocab_size]
- bitmask[i][j] = 0 表示 token j 在位置 i 是非法的
- 在采样时应用 bitmask: 非法 token 的 logit 设为 -inf
性能优化:
- bitmask 在每个 token 位置动态更新
- 使用 GPU 并行计算 bitmask
- 支持 incremental 更新(只更新变化的部分)
"""
def apply_grammar_bitmask(
logits: torch.Tensor, # [num_reqs, vocab_size]
grammar_bitmask: torch.Tensor, # [num_reqs, ceil(vocab_size/32)] 位掩码
indices: list[int], # 需要应用约束的请求索引
) -> None:
"""将语法约束 bitmask 应用到 logits。
算法:
1. 将 bitmask 解码为 token 级别的合法/非法标记
2. 对非法 token 的 logit 设为 -inf
注意: 这是 in-place 操作,直接修改 logits
"""
for i in indices:
# 解码 bitmask
# bitmask[i] 是一个 uint32 数组,每个 bit 代表一个 token
# bit=1 表示合法,bit=0 表示非法
mask = _decode_bitmask(grammar_bitmask[i], vocab_size)
logits[i][~mask] = float("-inf")
17.3 结构化输出流程
17.4 Grammar Bitmask 编码
附录 A: InputBatch 与 Sampler 数据流总图
附录 B: 采样参数速查表
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
temperature |
float | 1.0 | 温度,0=贪心,>0=随机 |
top_k |
int | -1 | Top-K,-1=不过滤 |
top_p |
float | 1.0 | Top-P,1.0=不过滤 |
min_p |
float | 0.0 | Min-P,0.0=不过滤 |
repetition_penalty |
float | 1.0 | 重复惩罚,1.0=无惩罚 |
frequency_penalty |
float | 0.0 | 频率惩罚,0.0=无惩罚 |
presence_penalty |
float | 0.0 | 存在惩罚,0.0=无惩罚 |
seed |
int | None | 随机种子,None=非确定性 |
max_tokens |
int | 16 | 最大生成 token 数 |
logprobs |
int | 0 | 返回的 top-k logprob 数 |
附录 C: 微批次参数说明
| 参数 | 说明 |
|---|---|
max_ubatch_size |
微批次最大请求数 |
max_ubatch_tokens |
微批次最大 token 数 |
ubatch_slices |
微批次在原始批次中的切片范围 |
附录 D: 关键类方法清单
InputBatch
| 方法 | 作用 |
|---|---|
add_request() |
添加新请求到批次 |
remove_request() |
移除已完成请求 |
update() |
根据调度器输出更新批次 |
_update_inputs() |
更新 GPU 输入张量 |
get_block_table() |
获取块表 |
get_seq_lens() |
获取序列长度 |
Sampler
| 方法 | 作用 |
|---|---|
init_gpu_tensors() |
初始化 GPU 张量 |
update_state() |
更新采样参数 |
forward() |
执行采样 |
SamplingStates
| 方法 | 作用 |
|---|---|
update() |
从 InputBatch 更新参数 |
init_gpu_tensors() |
初始化 GPU 张量 |
Part 2 结束 - 本文件覆盖了输入批处理系统(InputBatch、CachedRequestState)和完整采样管线(Sampler、Penalties、Logprobs、LogitBias、MinP、Gumbel、BadWords、StructuredOutputs)。
Part 3 将深入分析: GPUModelRunner(7174行核心)、推测解码(EAGLE/RejectionSampler)、多模态编码、KV 连接器、LoRA、模型状态、Mamba 等。
附录 E: InputBatch 完整更新流程深度分析
E.1 update() 方法详细执行步骤
InputBatch.update() 是每个推理步骤的核心更新方法。完整执行步骤:
-
处理新请求 (scheduled_new_reqs):
- 遍历调度器输出的新请求列表
- 为每个请求分配批次索引
- 创建 CachedRequestState 并填充初始状态
- 更新块表 block_table.add_row()
- 注册请求 ID 到索引的映射
-
处理恢复请求 (scheduled_cached_reqs):
- 恢复被抢占(preempted)的请求
- 更新已计算 token 数 (num_computed_tokens)
- 更新块表 (可能有新分配的块)
- 更新输出 token 数 (num_output_tokens)
-
处理 KV Cache 操作:
- swap_in: CPU->GPU, 将换出的 KV Cache 块换回
- swap_out: GPU->CPU, 将不活跃的 KV Cache 块换出
- copy_on_write: 复制共享块,确保写时隔离
-
移除完成请求:
- 清除请求的缓存状态
- 清除块表行
- 压缩批次(将最后请求移到空位)
-
更新 GPU 缓冲区 (_update_inputs):
- 填充 input_ids [num_tokens]
- 填充 positions [num_tokens]
- 更新 seq_lens [num_reqs]
- 更新 query_start_loc [num_reqs+1]
- 提交 block_table 到 GPU
E.2 KV Cache Swap 操作流程
E.3 Copy-On-Write 详解
Copy-On-Write (COW) 是 KV Cache 共享的关键机制:
当两个请求共享相同前缀时,它们共享 KV Cache 块。当其中一个请求需要修改(追加新 token)时,先复制共享块,然后在副本上修改。
附录 F: 微批次执行全流程
F.1 UBatchWrapper 执行流程详解
F.2 微批次拆分算法
maybe_create_ubatch_slices() 的拆分策略:
- 尽量保持请求的完整性(不拆分单个请求的 token)
- 优先满足 max_ubatch_tokens 约束
- 在满足 token 约束的前提下尽量填满 max_ubatch_size
- 使用贪心算法: 依次添加请求,直到超过阈值
附录 G: Sampler 完整执行流程图
G.1 Sampler.forward() 详细步骤
G.2 采样参数分组策略
Sampler 将请求按采样类型分组,避免混合采样导致的条件分支:
附录 H: Triton Kernel 优化详解
H.1 _topk_log_softmax_kernel 算法
这是采样管线中最关键的 Triton kernel,在 GPU 上并行计算所有请求的 log-softmax 并提取 top-k 结果。
算法步骤:
- 每个 Triton program 处理一个请求
- 分块扫描 logits,计算最大值 (数值稳定)
- 分块计算 exp(logit - max) 的和
- 计算 log_softmax = logit - max - log(sum_exp)
- 选择 top-k 个最大值
性能优化:
- 使用 BLOCK_SIZE 编译时常量,允许循环展开
- 单个 kernel 完成所有计算,避免多次 kernel launch
- 使用 Triton 自动向量化
附录 I: 采样管线性能优化汇总
| 优化技术 | 文件 | 加速原理 | 效果 |
|---|---|---|---|
| UvaBackedTensor | gpu/buffer_utils.py | CPU-GPU零拷贝 | 减少传输延迟 |
| Triton kernel | gpu/sample/logprob.py | 单kernel完成多步 | 减少75% launch开销 |
| 分组采样 | gpu/sample/sampler.py | 避免条件分支 | 提高GPU利用率 |
| Gumbel-max | gpu/sample/gumbel.py | 避免softmax计算 | 减少计算量 |
| Grammar bitmask | gpu/structured_outputs.py | 并行位操作 | 高效token过滤 |
| CUDA Graph | gpu/cudagraph_utils.py | 批量kernel提交 | 减少67% launch开销 |
附录 J: InputBatch 与 SchedulerOutput 数据映射
J.1 完整字段映射表
| SchedulerOutput 字段 | InputBatch 处理 | 更新方法 |
|---|---|---|
| scheduled_new_reqs | 创建 CachedRequestState | add_request() |
| scheduled_cached_reqs | 更新已有请求状态 | _update_resumed_request() |
| num_scheduled_tokens | 更新 num_new_tokens | 直接赋值 |
| blocks_to_swap_in | CPU->GPU KV Cache拷贝 | 直接操作 kv_cache tensor |
| blocks_to_swap_out | GPU->CPU KV Cache拷贝 | 直接操作 kv_cache tensor |
| blocks_to_copy | GPU内KV Cache复制 | 直接操作 kv_cache tensor |
| finished_req_ids | 移除请求 | remove_request() |
| preempted_req_ids | 暂停请求(保留缓存) | remove_request(不删缓存) |
| free_encoder_mm_hashes | 清除编码器缓存 | encoder_cache.pop() |
| new_block_ids_to_zero | 清零新块 | _zero_block_ids() |
J.2 数据映射流程
附录 K: SamplingMetadata 构建详解
K.1 SamplingMetadata 数据结构
SamplingMetadata 是采样器所需的元数据,包含每个请求的采样参数:
| 字段 | 形状 | 说明 |
|---|---|---|
| temperature | Tensor [num_reqs] | 温度参数 |
| top_k | Tensor [num_reqs] | Top-K参数 |
| top_p | Tensor [num_reqs] | Top-P参数 |
| min_p | Tensor [num_reqs] | Min-P参数 |
| seed | Tensor [num_reqs] | 随机种子 |
| repetition_penalty | Tensor [num_reqs] | 重复惩罚 |
| frequency_penalty | Tensor [num_reqs] | 频率惩罚 |
| presence_penalty | Tensor [num_reqs] | 存在惩罚 |
| logit_bias | Tensor [num_reqs, vocab_size] | logit偏置 |
| sampling_type | SamplingType[] [num_reqs] | 采样类型 |
K.2 SamplingType 分类逻辑
| 条件 | SamplingType | 采样方法 |
|---|---|---|
| temperature==0 | GREEDY | argmax(logits) |
| top_k==1 | GREEDY | argmax(logits) |
| temperature>0 | RANDOM | temperature + top-k/p + gumbel |
K.3 SamplingMetadata 构建流程
附录 L: LogprobsTensors 数据结构
L.1 LogprobsTensors 字段
LogprobsTensors 存储采样后的对数概率结果:
| 字段 | 形状 | 说明 |
|---|---|---|
| token_ids | [num_reqs, max_logprobs] | top-k token IDs |
| values | [num_reqs, max_logprobs] | top-k log概率值 |
| normalized | bool | 是否已归一化 |
L.2 GPU-CPU 数据转换
GPU上的LogprobsTensors需要转换为CPU端的LogprobsLists才能返回给用户:
附录 M: 采样管线性能分析
M.1 采样管线延迟分解
| 步骤 | 典型延迟 | 占比 |
|---|---|---|
| Penalties apply | 5us | 3% |
| Logit bias apply | 2us | 1% |
| Temperature + TopK and TopP | 15us | 10% |
| Gumbel sample | 10us | 7% |
| Grammar bitmask | 20us | 13% |
| Logprob compute (Triton) | 80us | 53% |
| Bad words filter | 3us | 2% |
| GPU-CPU copy | 15us | 10% |
| Total | ~150us | 100% |
M.2 采样管线与模型前向传播的时间比
模型前向传播占推理步骤的90-95% (约2-5ms),采样管线占3-8% (约0.1-0.2ms)。
虽然采样管线只占总推理时间的3-8%,但在高吞吐场景下,优化采样管线可以显著提高整体QPS。
M.3 采样管线优化方向
- Logprob Triton kernel优化: 占53%延迟,是最大瓶颈
- Grammar bitmask批量化: 占13%,使用更高效的位操作
- GPU-CPU异步拷贝: 占10%,使用独立CUDA Stream
- Penalties向量化: 占3%,使用GPU并行计算
附录 N: PromptLogprob 计算详解
N.1 提示对数概率计算方式
PromptLogprob 计算提示中每个token的条件概率:
对于 prompt [t0, t1, t2, t3]:
- logprob(t0) 不计算(无上下文)
- logprob(t1) = log P(t1 | t0)
- logprob(t2) = log P(t2 | t0, t1)
- logprob(t3) = log P(t3 | t0, t1, t2)
N.2 PromptLogprob 计算流程
附录 O: TPU InputBatch 特殊性
O.1 TPU 与 GPU 的关键差异
TPU InputBatch 与 GPU 版本的主要差异:
| 特性 | GPU InputBatch | TPU InputBatch |
|---|---|---|
| 计算后端 | CUDA/Triton | XLA/JAX |
| 形状约束 | CUDA Graph要求固定形状 | XLA编译要求固定形状 |
| 内存管理 | CUDA显存 | TPU HBM |
| 通信 | NCCL | TPU Mesh |
| 优化kernel | Triton/CUDA custom | XLA编译优化 |
O.2 XLA 编译 vs CUDA Graph
更多推荐


所有评论(0)