request_complete_callback 的最终作用

request_complete_callback(data=output_data) 是 ROLL 框架中异步生成请求完成后的回调函数,负责将 SGLang 生成的结果返回给原始请求方。 1

核心功能

1. 数据回传机制

回调函数将处理完成的 DataProto 对象传递回调用方,包含:

  • output_token_ids: 生成的 token 序列
  • finish_reasons: 完成原因
  • output_logprobs: 输出概率(如果启用)
  • 原始的 meta_info 元信息

2. 请求生命周期管理

ActorWorker.request_complete 方法中,回调函数执行以下操作 2

def request_complete(self, data: DataProto):
    data.meta_info["eos_token_id"] = self.tokenizer.eos_token_id
    data.meta_info["pad_token_id"] = self.tokenizer.pad_token_id
    response_call_back_fn = self.response_call_back_fns.pop(data.meta_info["request_id"])
    self.response_callback_refs.append(response_call_back_fn(data))

完整调用链

客户端 ActorWorker SgLangStrategy AsyncEngine SGLang服务 add_request(data) 存储response_callback_fn add_request(command, data) add_request(thread_queue, data) 生成请求 流式生成中... 返回chunks process_sglang_output() request_complete_callback(data) request_complete(data) response_callback_fn(data) 客户端 ActorWorker SgLangStrategy AsyncEngine SGLang服务

关键设计特点

异步非阻塞模式

  • 回调机制支持异步生成,不阻塞主线程
  • 通过 response_call_back_fns 字典管理多个并发请求 3

数据完整性保证

  • 确保生成的 token IDs 和概率信息长度对齐 4
  • 保留原始请求的元信息,便于追踪和调试

错误处理

  • 对于被中止的请求,返回 finish_reasons = [None] 5
  • 支持部分完成结果的收集(当 collect_unfinished=True 时)

Notes

  • request_complete_callback 是连接 SGLang 异步引擎和 ROLL 框架的桥梁
  • 该机制实现了生产者-消费者模式,提高了系统的吞吐量
  • 回调函数的引用存储在 response_callback_refs 中,便于管理和清理 6
  • 不同 SGLang 版本的回调机制保持一致,确保了系统的兼容性

Wiki pages you might want to explore:

Wiki pages you might want to explore:

Citations

File: roll/third_party/sglang/async_engine.py (L55-55)

            output_data.meta_info["finish_reasons"] = [None]  # not finished

File: roll/third_party/sglang/async_engine.py (L68-70)

            assert all([len(ids) == len(logprobs) for ids, logprobs in zip(output_token_ids, output_logprobs)]), (
                "output_token_ids and output_logprobs length not match"
            )

File: roll/third_party/sglang/async_engine.py (L73-73)

        request_complete_callback(data=output_data)

File: roll/pipeline/base_worker.py (L382-382)

            self.response_call_back_fns[data.meta_info["request_id"]] = data.meta_info.pop("response_callback_fn")

File: roll/pipeline/base_worker.py (L386-390)

    def request_complete(self, data: DataProto):
        data.meta_info["eos_token_id"] = self.tokenizer.eos_token_id
        data.meta_info["pad_token_id"] = self.tokenizer.pad_token_id
        response_call_back_fn = self.response_call_back_fns.pop(data.meta_info["request_id"])
        self.response_callback_refs.append(response_call_back_fn(data))
Logo

免费领 200 小时云算力,进群参与显卡、AI PC 幸运抽奖

更多推荐