模型服务化:FastAPI + Triton + vLLM 生产部署

1. 引言

训练好的模型需要以 API 服务的形式提供给下游应用。本文介绍三种主流的模型服务化方案。

方案对比:

方案 适用场景 吞吐量 易用性
FastAPI 小规模、自定义
Triton 大规模、多模型
vLLM LLM 专用 最高

2. FastAPI 服务

2.1 基础服务

from fastapi import FastAPI, UploadFile, File
from pydantic import BaseModel
import torch
from PIL import Image
import io

app = FastAPI(title="Model Service")

# 加载模型
model = torch.load("model.pt", map_location="cuda")
model.eval()

class PredictionRequest(BaseModel):
    text: str
    max_length: int = 512

class PredictionResponse(BaseModel):
    result: str
    confidence: float

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    with torch.no_grad():
        output = model(request.text)
    return PredictionResponse(
        result=output["label"],
        confidence=output["score"],
    )

@app.post("/predict/image")
async def predict_image(file: UploadFile = File(...)):
    image_bytes = await file.read()
    image = Image.open(io.BytesIO(image_bytes))
    # 处理...
    return {"result": "cat", "confidence": 0.95}

# 启动: uvicorn app:app --host 0.0.0.0 --port 8000

2.2 批处理与异步

import asyncio
from collections import deque

class BatchProcessor:
    def __init__(self, model, max_batch=32, max_wait=0.1):
        self.model = model
        self.max_batch = max_batch
        self.max_wait = max_wait
        self.queue = deque()
        self.results = {}

    async def add_request(self, request_id, input_data):
        future = asyncio.Future()
        self.queue.append((request_id, input_data, future))
        return await future

    async def process_loop(self):
        while True:
            if len(self.queue) >= self.max_batch:
                batch = [self.queue.popleft() for _ in range(self.max_batch)]
            elif self.queue:
                await asyncio.sleep(self.max_wait)
                batch = list(self.queue)
                self.queue.clear()
            else:
                await asyncio.sleep(0.01)
                continue

            inputs = [item[1] for item in batch]
            with torch.no_grad():
                results = self.model(inputs)

            for (_, _, future), result in zip(batch, results):
                future.set_result(result)

3. Triton Inference Server

3.1 模型仓库结构

model_repository/
├── resnet50/
│   ├── config.pbtxt
│   └── 1/
│       └── model.onnx
├── bert/
│   ├── config.pbtxt
│   └── 1/
│       └── model.plan
└── ensemble/
    ├── config.pbtxt
    └── 1/
        └── model.py

3.2 配置文件

# resnet50/config.pbtxt
name: "resnet50"
platform: "onnxruntime_onnx"
max_batch_size: 32

input [
  {
    name: "input"
    data_type: TYPE_FP32
    dims: [ 3, 224, 224 ]
  }
]

output [
  {
    name: "output"
    data_type: TYPE_FP32
    dims: [ 1000 ]
  }
]

instance_group [
  {
    count: 2
    kind: KIND_GPU
  }
]

dynamic_batching {
  preferred_batch_size: [ 8, 16 ]
  max_queue_delay_microseconds: 100
}

3.3 启动与调用

# 启动 Triton
docker run --gpus all -v /model_repository:/models \
  nvcr.io/nvidia/tritonserver:24.01-py3 \
  tritonserver --model-repository=/models
import tritonclient.http as httpclient

client = httpclient.InferenceServerClient(url="localhost:8000")

# 准备输入
import numpy as np
input_data = np.random.randn(1, 3, 224, 224).astype(np.float32)

inputs = [httpclient.InferInput("input", input_data.shape, "FP32")]
inputs[0].set_data_from_numpy(input_data)

outputs = [httpclient.InferRequestedOutput("output")]

result = client.infer("resnet50", inputs, outputs=outputs)
output = result.as_numpy("output")

4. vLLM 服务

4.1 启动服务

# 启动 vLLM OpenAI 兼容 API
python -m vllm.entrypoints.openai.api_server \
    --model meta-llama/Llama-2-7b-chat-hf \
    --tensor-parallel-size 1 \
    --gpu-memory-utilization 0.9 \
    --max-model-len 4096 \
    --port 8000

4.2 调用

from openai import OpenAI

client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")

response = client.chat.completions.create(
    model="meta-llama/Llama-2-7b-chat-hf",
    messages=[{"role": "user", "content": "Hello!"}],
    max_tokens=100,
    temperature=0.7,
)
print(response.choices[0].message.content)

4.3 批量推理

import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI(base_url="http://localhost:8000/v1", api_key="dummy")

async def batch_infer(prompts):
    tasks = []
    for prompt in prompts:
        task = async_client.chat.completions.create(
            model="meta-llama/Llama-2-7b-chat-hf",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=200,
        )
        tasks.append(task)

    results = await asyncio.gather(*tasks)
    return [r.choices[0].message.content for r in results]

# 批量调用
prompts = ["解释量子计算", "写一首诗", "Python 的优势是什么"]
results = asyncio.run(batch_infer(prompts))

5. 生产部署最佳实践

5.1 Docker Compose

# docker-compose.yml
version: '3.8'
services:
  model-server:
    image: vllm/vllm-openai:latest
    runtime: nvidia
    environment:
      - NVIDIA_VISIBLE_DEVICES=all
    command: >
      --model meta-llama/Llama-2-7b-chat-hf
      --gpu-memory-utilization 0.9
    ports:
      - "8000:8000"
    deploy:
      resources:
        reservations:
          devices:
            - capabilities: [gpu]

  nginx:
    image: nginx:latest
    ports:
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf

5.2 监控

from prometheus_client import Counter, Histogram, generate_latest

REQUEST_COUNT = Counter('model_requests_total', 'Total requests')
REQUEST_LATENCY = Histogram('model_request_latency_seconds', 'Request latency')

@app.middleware("http")
async def monitor(request, call_next):
    REQUEST_COUNT.inc()
    with REQUEST_LATENCY.time():
        response = await call_next(request)
    return response

@app.get("/metrics")
async def metrics():
    return generate_latest()

6. 总结

模型服务化选型:

  1. 小规模/自定义:FastAPI,灵活可控
  2. 大规模/多模型:Triton,企业级方案
  3. LLM 专用:vLLM,吞吐量最高
  4. 生产必备:Docker + 监控 + 负载均衡
Logo

免费领 200 小时云算力,进群参与显卡、AI PC 幸运抽奖

更多推荐