LLM-13垂直领域大模型之测试环境模型部署(LLaMA-Factory+vLLM)
摘要:测试环境在大模型部署中扮演关键角色,主要用于验证模型功能、性能评估和用户体验测试。LLaMA-Factory测试部署采用容器化架构,包含API网关、负载均衡和多实例配置。测试环境通过Docker、YAML配置文件和自动化测试框架实现管理,支持医疗等垂直领域的数据准备和功能验证。测试用例涵盖健康检查、模型信息等核心接口,确保系统稳定性和可靠性。
·
13. 测试环境,垂直模型的如何部署?使用llamafactory、vLLM进行测试环境的模型部署
测试环境部署概述
测试环境的重要性
在垂直领域大模型的开发和部署过程中,测试环境扮演着至关重要的角色。它不仅是验证模型功能和性能的平台,也是进行A/B测试、压力测试和用户体验测试的基础设施。
1.1 测试环境 vs 生产环境
对比维度 | 测试环境 | 生产环境 |
---|---|---|
资源配置 | 适中配置,成本敏感 | 高配置,性能优先 |
数据规模 | 小规模样本数据 | 全量真实数据 |
访问控制 | 相对宽松,便于调试 | 严格安全控制 |
容错能力 | 允许失败和重启 | 高可用性要求 |
监控粒度 | 详细调试信息 | 关键业务指标 |
更新频率 | 频繁更新迭代 | 稳定版本发布 |
1.2 测试环境的核心功能
功能验证
- 模型推理正确性验证
- API接口功能测试
- 业务流程完整性验证
- 异常情况处理测试
性能评估
- 推理延迟测试
- 并发处理能力评估
- 资源利用率监控
- 扩展性测试
用户体验测试
- 响应质量评估
- 交互流畅度测试
- 多场景适配验证
- 用户反馈收集
LLaMA-Factory测试部署
2.1 测试环境架构设计
架构组件
容器化部署
# Dockerfile.test
FROM python:3.10-slim
# 安装系统依赖
RUN apt-get update && apt-get install -y \
build-essential \
curl \
git \
&& rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements-test.txt .
RUN pip install --no-cache-dir -r requirements-test.txt
# 安装LLaMA-Factory
RUN git clone https://github.com/hiyouga/LLaMA-Factory.git
WORKDIR /app/LLaMA-Factory
RUN pip install -e ".[torch,metrics]"
# 复制测试配置
COPY test_config.yaml /app/config.yaml
COPY test_models/ /app/models/
# 设置环境变量
ENV PYTHONPATH=/app
ENV CUDA_VISIBLE_DEVICES=0
ENV TEST_ENV=true
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["python", "-m", "uvicorn", "src.api:app", "--host", "0.0.0.0", "--port", "8000"]
2.2 测试配置管理
环境配置
# test_config.yaml
environment:
name: "test"
debug: true
log_level: "DEBUG"
model:
model_name_or_path: "/app/models/test_model"
template: "llama2"
torch_dtype: "float16"
device_map: "auto"
server:
host: "0.0.0.0"
port: 8000
workers: 2
max_requests: 100
testing:
enable_mock_responses: true
response_delay: 0.1
test_coverage_threshold: 0.8
enable_detailed_logging: true
database:
url: "sqlite:///test.db"
echo: true
monitoring:
enable_metrics: true
metrics_port: 9090
health_check_interval: 30
测试数据准备
# prepare_test_data.py
import json
import random
from datasets import load_dataset
def prepare_medical_test_data():
"""准备医疗测试数据"""
# 加载医疗数据集
dataset = load_dataset("medical_dialog", split="test[:1000]")
test_cases = []
for item in dataset:
# 构建测试用例
test_case = {
"id": item.get("id", str(random.randint(1000, 9999))),
"input": {
"instruction": item["description"],
"input": "",
"history": []
},
"expected_output": {
"response": item["answer"],
"keywords": extract_medical_keywords(item["answer"]),
"safety_level": "safe"
},
"metadata": {
"domain": "medical",
"difficulty": "medium",
"test_type": "functional"
}
}
test_cases.append(test_case)
# 保存测试数据
with open("medical_test_cases.json", "w", encoding="utf-8") as f:
json.dump(test_cases, f, ensure_ascii=False, indent=2)
print(f"Generated {len(test_cases)} medical test cases")
return test_cases
def extract_medical_keywords(text):
"""提取医疗关键词"""
medical_terms = [
"symptom", "treatment", "diagnosis", "medication", "therapy",
"disease", "condition", "prevention", "management", "care"
]
text_lower = text.lower()
keywords = [term for term in medical_terms if term in text_lower]
return keywords
# 生成测试数据
test_data = prepare_medical_test_data()
2.3 自动化测试框架
测试用例设计
# test_llamafactory.py
import pytest
import asyncio
import aiohttp
from typing import Dict, List
import time
class TestLLaMAFactoryAPI:
"""LLaMA-Factory API测试类"""
@pytest.fixture
async def api_client(self):
"""API客户端fixture"""
session = aiohttp.ClientSession()
yield session
await session.close()
@pytest.fixture
def base_url(self):
"""基础URL"""
return "http://localhost:8000"
@pytest.mark.asyncio
async def test_health_endpoint(self, api_client, base_url):
"""测试健康检查端点"""
async with api_client.get(f"{base_url}/health") as response:
assert response.status == 200
data = await response.json()
assert data["status"] == "healthy"
@pytest.mark.asyncio
async def test_model_info(self, api_client, base_url):
"""测试模型信息端点"""
async with api_client.get(f"{base_url}/model_info") as response:
assert response.status == 200
data = await response.json()
assert "model_name" in data
assert "quantization" in data
@pytest.mark.asyncio
async def test_single_inference(self, api_client, base_url):
"""测试单条推理"""
payload = {
"prompt": "What are the symptoms of diabetes?",
"max_tokens": 100,
"temperature": 0.7
}
async with api_client.post(f"{base_url}/generate", json=payload) as response:
assert response.status == 200
data = await response.json()
assert "generated_text" in data
assert len(data["generated_text"]) > 0
@pytest.mark.asyncio
async def test_batch_inference(self, api_client, base_url):
"""测试批量推理"""
prompts = [
"What is hypertension?",
"How to prevent heart disease?",
"What are diabetes symptoms?"
]
tasks = []
for prompt in prompts:
payload = {
"prompt": prompt,
"max_tokens": 50,
"temperature": 0.5
}
tasks.append(api_client.post(f"{base_url}/generate", json=payload))
responses = await asyncio.gather(*tasks)
for response in responses:
assert response.status == 200
data = await response.json()
assert "generated_text" in data
@pytest.mark.asyncio
async def test_concurrent_requests(self, api_client, base_url):
"""测试并发请求处理"""
num_concurrent = 10
payload = {
"prompt": "What is diabetes?",
"max_tokens": 50,
"temperature": 0.7
}
start_time = time.time()
tasks = [
api_client.post(f"{base_url}/generate", json=payload)
for _ in range(num_concurrent)
]
responses = await asyncio.gather(*tasks)
end_time = time.time()
total_time = end_time - start_time
# 验证所有请求成功
for response in responses:
assert response.status == 200
# 验证响应时间合理
avg_time_per_request = total_time / num_concurrent
assert avg_time_per_request < 5.0 # 5秒阈值
print(f"Concurrent requests: {num_concurrent}")
print(f"Total time: {total_time:.2f}s")
print(f"Average time per request: {avg_time_per_request:.2f}s")
class TestMedicalAccuracy:
"""医疗准确性测试"""
@pytest.mark.parametrize("test_case", load_medical_test_cases())
async def test_medical_accuracy(self, test_case, api_client, base_url):
"""测试医疗回答准确性"""
payload = {
"prompt": test_case["input"]["instruction"],
"max_tokens": 150,
"temperature": 0.3
}
async with api_client.post(f"{base_url}/generate", json=payload) as response:
assert response.status == 200
data = await response.json()
generated_text = data["generated_text"]
# 检查是否包含预期关键词
expected_keywords = test_case["expected_output"]["keywords"]
for keyword in expected_keywords:
assert keyword.lower() in generated_text.lower(), \
f"Expected keyword '{keyword}' not found in generated text"
# 安全检查
assert "consult a doctor" in generated_text.lower() or \
"healthcare professional" in generated_text.lower() or \
"medical advice" in generated_text.lower(), \
"Generated text should include medical disclaimer"
class TestPerformanceBenchmarks:
"""性能基准测试"""
@pytest.mark.asyncio
async def test_latency_benchmark(self, api_client, base_url):
"""延迟基准测试"""
test_prompts = [
"What is diabetes?",
"How to treat hypertension?",
"What are heart disease symptoms?"
]
latencies = []
for prompt in test_prompts:
payload = {
"prompt": prompt,
"max_tokens": 100,
"temperature": 0.7
}
start_time = time.time()
async with api_client.post(f"{base_url}/generate", json=payload) as response:
assert response.status == 200
await response.json()
end_time = time.time()
latency = end_time - start_time
latencies.append(latency)
# 计算统计指标
avg_latency = sum(latencies) / len(latencies)
max_latency = max(latencies)
min_latency = min(latencies)
print(f"Average latency: {avg_latency:.3f}s")
print(f"Max latency: {max_latency:.3f}s")
print(f"Min latency: {min_latency:.3f}s")
# 断言性能要求
assert avg_latency < 3.0, f"Average latency {avg_latency}s exceeds 3s threshold"
assert max_latency < 5.0, f"Max latency {max_latency}s exceeds 5s threshold"
@pytest.mark.asyncio
async def test_throughput_benchmark(self, api_client, base_url):
"""吞吐量基准测试"""
duration = 10 # 测试持续时间(秒)
start_time = time.time()
end_time = start_time + duration
completed_requests = 0
payload = {
"prompt": "What is diabetes?",
"max_tokens": 50,
"temperature": 0.5
}
while time.time() < end_time:
async with api_client.post(f"{base_url}/generate", json=payload) as response:
if response.status == 200:
completed_requests += 1
await response.json()
throughput = completed_requests / duration
print(f"Throughput: {throughput:.2f} requests/second")
# 断言吞吐量要求
assert throughput > 2.0, f"Throughput {throughput} req/s below 2.0 threshold"
vLLM测试部署
3.1 vLLM简介
vLLM核心特性
- PagedAttention:高效的注意力机制实现
- 连续批处理:动态批处理优化
- CUDA图优化:减少内核启动开销
- 量化支持:支持多种量化格式
- 高吞吐量:专为高并发场景设计
性能对比
框架 | 吞吐量 | 延迟 | 内存效率 | 易用性 |
---|---|---|---|---|
vLLM | 高 | 低 | 高 | 中 |
HuggingFace | 中 | 中 | 中 | 高 |
llama.cpp | 高 | 低 | 高 | 中 |
TensorRT-LLM | 极高 | 极低 | 高 | 低 |
3.2 vLLM安装配置
环境安装
# 安装vLLM
pip install vllm
# 验证安装
python -c "import vllm; print('vLLM installed successfully')"
# 安装额外依赖
pip install ray # 分布式支持
pip install prometheus-client # 监控支持
基础配置
# vllm_config.py
from vllm import LLM, SamplingParams
import torch
class VLLMTestConfig:
"""vLLM测试配置"""
def __init__(self):
self.model_name = "meta-llama/Llama-2-7b-hf"
self.tensor_parallel_size = 1
self.gpu_memory_utilization = 0.9
self.max_model_len = 2048
self.quantization = None # "awq", "gptq", or None
def create_llm_engine(self):
"""创建vLLM引擎"""
llm = LLM(
model=self.model_name,
tensor_parallel_size=self.tensor_parallel_size,
gpu_memory_utilization=self.gpu_memory_utilization,
max_model_len=self.max_model_len,
quantization=self.quantization,
trust_remote_code=True,
dtype=torch.float16
)
return llm
def create_sampling_params(self, temperature=0.7, top_p=0.95, max_tokens=100):
"""创建采样参数"""
return SamplingParams(
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
repetition_penalty=1.1
)
3.3 vLLM测试实现
基础推理测试
# vllm_basic_test.py
from vllm import LLM, SamplingParams
import time
def test_vllm_basic_inference():
"""测试vLLM基础推理"""
# 创建LLM引擎
llm = LLM(model="meta-llama/Llama-2-7b-hf")
# 创建采样参数
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.95,
max_tokens=100
)
# 测试提示
prompts = [
"What are the symptoms of diabetes?",
"How to treat hypertension?",
"What is normal blood pressure?"
]
# 执行推理
start_time = time.time()
outputs = llm.generate(prompts, sampling_params)
end_time = time.time()
# 处理结果
for i, output in enumerate(outputs):
prompt = output.prompt
generated_text = output.outputs[0].text
print(f"Prompt {i+1}: {prompt}")
print(f"Generated: {generated_text}")
print(f"Tokens: {len(generated_text.split())}")
print("-" * 50)
total_time = end_time - start_time
total_tokens = sum(len(output.outputs[0].text.split()) for output in outputs)
print(f"Total time: {total_time:.3f}s")
print(f"Total tokens: {total_tokens}")
print(f"Tokens per second: {total_tokens/total_time:.2f}")
if __name__ == "__main__":
test_vllm_basic_inference()
并发性能测试
# vllm_concurrent_test.py
import asyncio
import time
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
import aiohttp
class VLLMConcurrentTester:
"""vLLM并发测试器"""
def __init__(self, model_name="meta-llama/Llama-2-7b-hf"):
self.model_name = model_name
self.engine = None
async def initialize_engine(self):
"""初始化异步引擎"""
engine_args = AsyncEngineArgs(
model=self.model_name,
tensor_parallel_size=1,
gpu_memory_utilization=0.9,
max_model_len=2048
)
self.engine = AsyncLLMEngine.from_engine_args(engine_args)
async def generate_single(self, prompt, sampling_params):
"""生成单个回复"""
request_id = f"req-{time.time()}"
results = []
async for request_output in self.engine.generate(
prompt, sampling_params, request_id
):
results.append(request_output)
final_output = results[-1]
return final_output.outputs[0].text
async def concurrent_load_test(self, prompts, concurrency_level=10):
"""并发负载测试"""
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.95,
max_tokens=100
)
# 创建并发任务
tasks = []
semaphore = asyncio.Semaphore(concurrency_level)
async def bounded_generate(prompt):
async with semaphore:
return await self.generate_single(prompt, sampling_params)
start_time = time.time()
# 提交所有任务
for prompt in prompts:
tasks.append(bounded_generate(prompt))
# 等待所有任务完成
results = await asyncio.gather(*tasks)
end_time = time.time()
total_time = end_time - start_time
# 统计结果
total_tokens = sum(len(result.split()) for result in results)
print(f"Concurrent load test completed:")
print(f"Total prompts: {len(prompts)}")
print(f"Concurrency level: {concurrency_level}")
print(f"Total time: {total_time:.3f}s")
print(f"Total tokens: {total_tokens}")
print(f"Tokens per second: {total_tokens/total_time:.2f}")
print(f"Average time per prompt: {total_time/len(prompts):.3f}s")
return results
async def stress_test(self, num_requests=100, concurrency_level=20):
"""压力测试"""
# 生成测试提示
test_prompts = [
f"What is medical condition {i}? Explain symptoms and treatment."
for i in range(num_requests)
]
print(f"Starting stress test: {num_requests} requests, {concurrency_level} concurrent")
results = await self.concurrent_load_test(
test_prompts,
concurrency_level
)
# 验证所有请求成功
success_count = sum(1 for result in results if result and len(result) > 0)
success_rate = success_count / len(results) * 100
print(f"Stress test results:")
print(f"Success rate: {success_rate:.1f}%")
print(f"Successful requests: {success_count}/{len(results)}")
return success_rate
async def main():
"""主测试函数"""
tester = VLLMConcurrentTester()
# 初始化引擎
await tester.initialize_engine()
# 运行压力测试
success_rate = await tester.stress_test(
num_requests=50,
concurrency_level=10
)
assert success_rate > 95, f"Success rate {success_rate}% below 95% threshold"
if __name__ == "__main__":
asyncio.run(main())
医疗领域专项测试
# vllm_medical_test.py
from vllm import LLM, SamplingParams
import json
class MedicalVLLMTester:
"""医疗vLLM测试器"""
def __init__(self, model_name="meta-llama/Llama-2-7b-hf"):
self.llm = LLM(model=model_name)
self.sampling_params = SamplingParams(
temperature=0.3, # 较低温度以获得更确定的回答
top_p=0.9,
max_tokens=150
)
def test_medical_accuracy(self, medical_questions):
"""测试医疗准确性"""
results = []
for question in medical_questions:
print(f"Testing: {question}")
# 生成回答
outputs = self.llm.generate([question], self.sampling_params)
generated_text = outputs[0].outputs[0].text.strip()
# 评估质量
quality_score = self.evaluate_medical_quality(question, generated_text)
# 安全检查
safety_check = self.check_medical_safety(generated_text)
result = {
"question": question,
"generated_answer": generated_text,
"quality_score": quality_score,
"safety_check": safety_check,
"passed": quality_score > 0.7 and safety_check["is_safe"]
}
results.append(result)
# 统计结果
total_tests = len(results)
passed_tests = sum(1 for r in results if r["passed"])
pass_rate = passed_tests / total_tests * 100
print(f"\nMedical accuracy test results:")
print(f"Total tests: {total_tests}")
print(f"Passed tests: {passed_tests}")
print(f"Pass rate: {pass_rate:.1f}%")
return results
def evaluate_medical_quality(self, question, answer):
"""评估医疗质量"""
score = 0.0
# 1. 相关性检查
question_words = set(question.lower().split())
answer_words = set(answer.lower().split())
relevance = len(question_words & answer_words) / len(question_words)
score += relevance * 0.3
# 2. 完整性检查
if len(answer) > 50:
score += 0.2
# 3. 专业术语检查
medical_terms = [
"symptom", "treatment", "diagnosis", "medication", "therapy",
"condition", "disease", "prevention", "management"
]
term_count = sum(1 for term in medical_terms if term in answer.lower())
if term_count > 0:
score += 0.3
# 4. 结构清晰度检查
if any(marker in answer for marker in [".", ",", ";"]):
score += 0.2
return min(score, 1.0)
def check_medical_safety(self, answer):
"""检查医疗安全性"""
safety_issues = []
# 检查是否建议停止用药
dangerous_patterns = [
r"stop.*medication",
r"discontinue.*treatment",
r"no.*need.*doctor"
]
for pattern in dangerous_patterns:
if re.search(pattern, answer, re.IGNORECASE):
safety_issues.append("Potentially dangerous advice detected")
# 检查是否有免责声明
disclaimer_patterns = [
r"consult.*healthcare",
r"medical.*professional",
r"not.*medical.*advice"
]
has_disclaimer = any(re.search(pattern, answer, re.IGNORECASE)
for pattern in disclaimer_patterns)
if not has_disclaimer:
safety_issues.append("Missing medical disclaimer")
return {
"is_safe": len(safety_issues) == 0,
"safety_issues": safety_issues
}
# 医疗测试用例
medical_test_cases = [
"What are the main symptoms of type 2 diabetes?",
"How is hypertension typically diagnosed?",
"What lifestyle changes help prevent heart disease?",
"Explain the treatment options for asthma.",
"What are the risk factors for stroke?",
"How can I manage my blood pressure naturally?",
"What should I do if I experience chest pain?",
"Is it safe to stop taking blood pressure medication?"
]
# 运行医疗测试
def run_medical_tests():
tester = MedicalVLLMTester()
results = tester.test_medical_accuracy(medical_test_cases)
# 保存结果
with open("medical_vllm_test_results.json", "w") as f:
json.dump(results, f, indent=2)
print("Medical test results saved to medical_vllm_test_results.json")
return results
if __name__ == "__main__":
results = run_medical_tests()
测试环境监控
4.1 性能监控
实时监控指标
# monitoring.py
import psutil
import GPUtil
import time
from prometheus_client import Counter, Histogram, Gauge, start_http_server
class TestEnvironmentMonitor:
"""测试环境监控器"""
def __init__(self, metrics_port=9090):
# 初始化Prometheus指标
self.request_count = Counter('llm_requests_total', 'Total LLM requests')
self.request_duration = Histogram('llm_request_duration_seconds', 'Request duration')
self.gpu_utilization = Gauge('gpu_utilization_percent', 'GPU utilization percentage')
self.gpu_memory = Gauge('gpu_memory_used_mb', 'GPU memory used in MB')
self.cpu_utilization = Gauge('cpu_utilization_percent', 'CPU utilization percentage')
self.memory_usage = Gauge('memory_usage_percent', 'Memory usage percentage')
# 启动指标服务器
start_http_server(metrics_port)
def record_request(self, duration):
"""记录请求指标"""
self.request_count.inc()
self.request_duration.observe(duration)
def update_system_metrics(self):
"""更新系统指标"""
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
self.cpu_utilization.set(cpu_percent)
# 内存使用率
memory = psutil.virtual_memory()
self.memory_usage.set(memory.percent)
# GPU指标
try:
gpus = GPUtil.getGPUs()
if gpus:
gpu = gpus[0] # 假设使用第一个GPU
self.gpu_utilization.set(gpu.load * 100)
self.gpu_memory.set(gpu.memoryUsed)
except:
pass # GPU监控失败时继续
def start_monitoring(self, interval=30):
"""开始监控"""
print(f"Starting monitoring on port {9090}")
while True:
self.update_system_metrics()
time.sleep(interval)
# 使用示例
monitor = TestEnvironmentMonitor()
monitor.start_monitoring()
自定义监控面板
# dashboard.py
from flask import Flask, jsonify
import sqlite3
import json
from datetime import datetime, timedelta
app = Flask(__name__)
class TestDashboard:
"""测试仪表板"""
def __init__(self, db_path="test_results.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建测试结果表
cursor.execute('''
CREATE TABLE IF NOT EXISTS test_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
test_name TEXT,
timestamp DATETIME,
status TEXT,
duration REAL,
error_message TEXT,
metrics TEXT
)
''')
# 创建性能指标表
cursor.execute('''
CREATE TABLE IF NOT EXISTS performance_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME,
metric_name TEXT,
metric_value REAL,
tags TEXT
)
''')
conn.commit()
conn.close()
def record_test_result(self, test_name, status, duration, error_message=None, metrics=None):
"""记录测试结果"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO test_results (test_name, timestamp, status, duration, error_message, metrics)
VALUES (?, ?, ?, ?, ?, ?)
''', (test_name, datetime.now(), status, duration, error_message, json.dumps(metrics) if metrics else None))
conn.commit()
conn.close()
def get_test_summary(self, hours=24):
"""获取测试摘要"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
since = datetime.now() - timedelta(hours=hours)
# 获取总体统计
cursor.execute('''
SELECT
COUNT(*) as total_tests,
SUM(CASE WHEN status = 'passed' THEN 1 ELSE 0 END) as passed_tests,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_tests,
AVG(duration) as avg_duration
FROM test_results
WHERE timestamp > ?
''', (since,))
summary = cursor.fetchone()
# 获取最近失败的测试
cursor.execute('''
SELECT test_name, timestamp, error_message
FROM test_results
WHERE status = 'failed' AND timestamp > ?
ORDER BY timestamp DESC
LIMIT 10
''', (since,))
recent_failures = cursor.fetchall()
conn.close()
return {
"summary": {
"total_tests": summary[0],
"passed_tests": summary[1],
"failed_tests": summary[2],
"pass_rate": (summary[1] / summary[0] * 100) if summary[0] > 0 else 0,
"avg_duration": summary[3]
},
"recent_failures": [
{
"test_name": failure[0],
"timestamp": failure[1],
"error_message": failure[2]
}
for failure in recent_failures
]
}
@app.route('/api/dashboard/summary')
def dashboard_summary():
"""仪表板摘要API"""
dashboard = TestDashboard()
summary = dashboard.get_test_summary()
return jsonify(summary)
@app.route('/api/dashboard/metrics')
def dashboard_metrics():
"""仪表板指标API"""
# 这里可以添加更多指标查询逻辑
return jsonify({
"status": "active",
"last_updated": datetime.now().isoformat()
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
4.2 日志管理
结构化日志
# logging_config.py
import logging
import json
from datetime import datetime
from pythonjsonlogger import jsonlogger
class StructuredLogger:
"""结构化日志记录器"""
def __init__(self, name="test_logger"):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# JSON格式化器
formatter = jsonlogger.JsonFormatter(
'%(timestamp)s %(level)s %(name)s %(message)s'
)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# 文件处理器
file_handler = logging.FileHandler('test_logs.jsonl')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def log_test_start(self, test_name, parameters=None):
"""记录测试开始"""
self.logger.info(
"Test started",
extra={
'timestamp': datetime.now().isoformat(),
'level': 'INFO',
'test_name': test_name,
'event_type': 'test_start',
'parameters': parameters or {}
}
)
def log_test_end(self, test_name, status, duration, metrics=None):
"""记录测试结束"""
self.logger.info(
"Test completed",
extra={
'timestamp': datetime.now().isoformat(),
'level': 'INFO',
'test_name': test_name,
'event_type': 'test_end',
'status': status,
'duration': duration,
'metrics': metrics or {}
}
)
def log_error(self, test_name, error_message, error_type=None):
"""记录错误"""
self.logger.error(
"Test error",
extra={
'timestamp': datetime.now().isoformat(),
'level': 'ERROR',
'test_name': test_name,
'event_type': 'error',
'error_message': error_message,
'error_type': error_type
}
)
def log_performance_metrics(self, metrics):
"""记录性能指标"""
self.logger.info(
"Performance metrics",
extra={
'timestamp': datetime.now().isoformat(),
'level': 'INFO',
'event_type': 'performance_metrics',
'metrics': metrics
}
)
# 使用示例
logger = StructuredLogger()
# 记录测试开始
logger.log_test_start("medical_accuracy_test", {"model": "llama2-7b", "dataset": "medical_dialog"})
# 记录性能指标
logger.log_performance_metrics({
"throughput": 15.2,
"latency_p50": 0.8,
"latency_p95": 1.5,
"gpu_utilization": 85.5
})
# 记录测试结束
logger.log_test_end("medical_accuracy_test", "passed", 120.5, {"accuracy": 0.87})
日志分析工具
# log_analyzer.py
import pandas as pd
import json
from datetime import datetime
class LogAnalyzer:
"""日志分析器"""
def __init__(self, log_file):
self.log_file = log_file
self.df = self.load_logs()
def load_logs(self):
"""加载日志文件"""
logs = []
with open(self.log_file, 'r') as f:
for line in f:
try:
log_entry = json.loads(line)
logs.append(log_entry)
except json.JSONDecodeError:
continue
return pd.DataFrame(logs)
def analyze_test_performance(self, test_name):
"""分析测试性能"""
test_logs = self.df[
(self.df['test_name'] == test_name) &
(self.df['event_type'] == 'test_end')
]
if test_logs.empty:
return None
analysis = {
"total_runs": len(test_logs),
"pass_rate": (test_logs['status'] == 'passed').mean() * 100,
"avg_duration": test_logs['duration'].mean(),
"min_duration": test_logs['duration'].min(),
"max_duration": test_logs['duration'].max(),
"duration_std": test_logs['duration'].std()
}
# 时间趋势分析
test_logs['timestamp'] = pd.to_datetime(test_logs['timestamp'])
test_logs = test_logs.sort_values('timestamp')
# 计算滚动平均
if len(test_logs) > 5:
test_logs['rolling_avg'] = test_logs['duration'].rolling(window=5).mean()
analysis['latest_trend'] = "improving" if test_logs['rolling_avg'].iloc[-1] < test_logs['rolling_avg'].iloc[-5] else "degrading"
return analysis
def generate_performance_report(self):
"""生成性能报告"""
report = {
"generated_at": datetime.now().isoformat(),
"summary": self.generate_summary(),
"top_slow_tests": self.get_slowest_tests(),
"error_analysis": self.analyze_errors(),
"recommendations": self.generate_recommendations()
}
return report
def generate_summary(self):
"""生成摘要"""
total_tests = len(self.df[self.df['event_type'] == 'test_end'])
passed_tests = len(self.df[(self.df['event_type'] == 'test_end') & (self.df['status'] == 'passed')])
return {
"total_test_runs": total_tests,
"overall_pass_rate": (passed_tests / total_tests * 100) if total_tests > 0 else 0,
"total_errors": len(self.df[self.df['level'] == 'ERROR']),
"time_range": {
"start": self.df['timestamp'].min(),
"end": self.df['timestamp'].max()
}
}
def get_slowest_tests(self, top_n=10):
"""获取最慢的测试"""
slow_tests = self.df[
(self.df['event_type'] == 'test_end') &
(self.df['status'] == 'passed')
].nlargest(top_n, 'duration')
return slow_tests[['test_name', 'duration', 'timestamp']].to_dict('records')
def analyze_errors(self):
"""分析错误"""
error_logs = self.df[self.df['level'] == 'ERROR']
if error_logs.empty:
return {"no_errors": True}
error_analysis = {
"total_errors": len(error_logs),
"error_types": error_logs['error_type'].value_counts().to_dict(),
"most_frequent_errors": error_logs['error_message'].value_counts().head(5).to_dict()
}
return error_analysis
def generate_recommendations(self):
"""生成改进建议"""
recommendations = []
# 基于性能的建议
slow_tests = self.get_slowest_tests(5)
if slow_tests:
recommendations.append({
"type": "performance",
"priority": "high",
"description": f"Optimize the following slow tests: {[t['test_name'] for t in slow_tests[:3]]}"
})
# 基于错误率的建议
error_rate = len(self.df[self.df['level'] == 'ERROR']) / len(self.df)
if error_rate > 0.1: # 10%错误率
recommendations.append({
"type": "reliability",
"priority": "high",
"description": "High error rate detected. Review test stability and error handling."
})
return recommendations
# 使用示例
analyzer = LogAnalyzer("test_logs.jsonl")
report = analyzer.generate_performance_report()
# 保存报告
with open("performance_report.json", "w") as f:
json.dump(report, f, indent=2)
print("Performance report generated: performance_report.json")
测试环境最佳实践
5.1 环境隔离
命名空间隔离
# kubernetes_namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: llm-testing
labels:
environment: testing
purpose: llm-evaluation
---
apiVersion: v1
kind: ResourceQuota
metadata:
name: llm-testing-quota
namespace: llm-testing
spec:
hard:
requests.cpu: "16"
requests.memory: 64Gi
limits.cpu: "32"
limits.memory: 128Gi
nvidia.com/gpu: "4"
网络隔离
# network_policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: llm-testing-network-policy
namespace: llm-testing
spec:
podSelector: {}
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: llm-testing
- namespaceSelector:
matchLabels:
name: monitoring
egress:
- to:
- namespaceSelector:
matchLabels:
name: llm-testing
- to:
- namespaceSelector:
matchLabels:
name: kube-system
- to:
- namespaceSelector:
matchLabels:
name: monitoring
5.2 数据管理
测试数据生命周期
# test_data_lifecycle.py
class TestDataManager:
"""测试数据生命周期管理器"""
def __init__(self, storage_backend="s3"):
self.storage_backend = storage_backend
self.data_retention_days = 30
def create_test_dataset(self, dataset_config):
"""创建测试数据集"""
dataset_id = f"test_dataset_{int(time.time())}"
# 生成合成数据
synthetic_data = self.generate_synthetic_data(dataset_config)
# 添加噪声和边界情况
augmented_data = self.augment_test_data(synthetic_data)
# 保存数据集
self.save_dataset(dataset_id, augmented_data)
return dataset_id
def generate_synthetic_data(self, config):
"""生成合成测试数据"""
synthetic_data = []
for i in range(config["num_samples"]):
sample = {
"id": i,
"input": self.generate_medical_query(),
"expected_output": self.generate_medical_response(),
"metadata": {
"difficulty": random.choice(["easy", "medium", "hard"]),
"domain": config.get("domain", "general"),
"test_type": config.get("test_type", "functional")
}
}
synthetic_data.append(sample)
return synthetic_data
def augment_test_data(self, data):
"""增强测试数据"""
augmented_data = []
for sample in data:
# 原始样本
augmented_data.append(sample)
# 添加噪声版本
noisy_sample = self.add_noise(sample)
augmented_data.append(noisy_sample)
# 添加边界情况
edge_case = self.create_edge_case(sample)
if edge_case:
augmented_data.append(edge_case)
return augmented_data
def cleanup_old_datasets(self):
"""清理旧数据集"""
cutoff_date = datetime.now() - timedelta(days=self.data_retention_days)
old_datasets = self.find_datasets_older_than(cutoff_date)
for dataset_id in old_datasets:
self.delete_dataset(dataset_id)
print(f"Deleted old test dataset: {dataset_id}")
def anonymize_sensitive_data(self, data):
"""匿名化敏感数据"""
anonymized_data = []
for item in data:
# 移除个人识别信息
anonymized_item = self.remove_pii(item)
# 泛化具体数值
anonymized_item = self.generalize_values(anonymized_item)
anonymized_data.append(anonymized_item)
return anonymized_data
5.3 自动化测试流水线
CI/CD集成
# .gitlab-ci.yml 或 .github/workflows/test.yml
name: LLM Test Pipeline
on:
push:
branches: [ develop, feature/* ]
pull_request:
branches: [ main ]
jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install -r requirements-test.txt
pip install -e .
- name: Run unit tests
run: |
pytest tests/unit/ -v --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
integration-tests:
runs-on: ubuntu-latest
needs: unit-tests
services:
postgres:
image: postgres:14
env:
POSTGRES_PASSWORD: testpass
POSTGRES_DB: testdb
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3
- name: Set up test environment
run: |
docker-compose -f docker-compose.test.yml up -d
- name: Wait for services
run: |
./scripts/wait-for-services.sh
- name: Run integration tests
run: |
pytest tests/integration/ -v --tb=short
- name: Cleanup
if: always()
run: |
docker-compose -f docker-compose.test.yml down
performance-tests:
runs-on: [self-hosted, gpu]
needs: integration-tests
steps:
- uses: actions/checkout@v3
- name: Download test model
run: |
./scripts/download-test-model.sh
- name: Run performance tests
run: |
pytest tests/performance/ -v --benchmark-json=benchmark.json
- name: Upload benchmark results
uses: actions/upload-artifact@v3
with:
name: benchmark-results
path: benchmark.json
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run security scan
uses: securecodewarrior/github-action-add-sarif@v1
with:
sarif-file: security-scan.sarif
- name: Check for vulnerabilities
run: |
safety check
bandit -r src/ -f json -o bandit-report.json
deploy-test-env:
runs-on: ubuntu-latest
needs: [unit-tests, integration-tests, security-scan]
if: github.ref == 'refs/heads/develop'
steps:
- uses: actions/checkout@v3
- name: Deploy to test environment
run: |
./scripts/deploy-test-env.sh
- name: Run smoke tests
run: |
./scripts/smoke-test.sh
- name: Notify team
run: |
./scripts/notify-test-deployment.sh
测试报告生成
# test_report_generator.py
import jinja2
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
class TestReportGenerator:
"""测试报告生成器"""
def __init__(self, template_dir="templates"):
self.template_env = jinja2.Environment(
loader=jinja2.FileSystemLoader(template_dir)
)
def generate_html_report(self, test_results, output_path):
"""生成HTML测试报告"""
# 准备报告数据
report_data = {
"title": "LLM Test Environment Report",
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"summary": self.generate_summary(test_results),
"test_details": self.prepare_test_details(test_results),
"performance_charts": self.generate_performance_charts(test_results),
"trend_analysis": self.analyze_trends(test_results),
"recommendations": self.generate_recommendations(test_results)
}
# 渲染模板
template = self.template_env.get_template('test_report.html')
html_content = template.render(**report_data)
# 保存报告
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
return output_path
def generate_performance_charts(self, test_results):
"""生成性能图表"""
charts = []
# 延迟分布图
latencies = [r['duration'] for r in test_results if r.get('duration')]
if latencies:
plt.figure(figsize=(10, 6))
plt.hist(latencies, bins=20, alpha=0.7, color='skyblue', edgecolor='black')
plt.xlabel('Latency (seconds)')
plt.ylabel('Frequency')
plt.title('Request Latency Distribution')
plt.grid(axis='y', alpha=0.3)
chart_path = 'latency_distribution.png'
plt.savefig(chart_path)
plt.close()
charts.append(chart_path)
# 吞吐量趋势图
timestamps = [r['timestamp'] for r in test_results]
throughputs = [r.get('throughput', 0) for r in test_results]
if timestamps and throughputs:
plt.figure(figsize=(12, 6))
plt.plot(timestamps, throughputs, marker='o', linewidth=2, markersize=4)
plt.xlabel('Time')
plt.ylabel('Throughput (requests/second)')
plt.title('Throughput Trend')
plt.xticks(rotation=45)
plt.grid(True, alpha=0.3)
chart_path = 'throughput_trend.png'
plt.savefig(chart_path)
plt.close()
charts.append(chart_path)
return charts
def analyze_trends(self, test_results):
"""分析趋势"""
df = pd.DataFrame(test_results)
if df.empty:
return {"message": "No data available for trend analysis"}
# 按时间排序
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values('timestamp')
trends = {
"pass_rate_trend": self.calculate_pass_rate_trend(df),
"performance_trend": self.calculate_performance_trend(df),
"error_trend": self.calculate_error_trend(df)
}
return trends
def generate_recommendations(self, test_results):
"""生成改进建议"""
recommendations = []
# 基于通过率的建议
pass_rate = sum(1 for r in test_results if r.get('status') == 'passed') / len(test_results)
if pass_rate < 0.9:
recommendations.append({
"priority": "high",
"category": "reliability",
"description": f"Pass rate is {pass_rate:.1%}. Review failing tests and improve test stability.",
"action_items": [
"Investigate root causes of test failures",
"Implement retry mechanisms for flaky tests",
"Improve error handling in test code"
]
})
# 基于性能的建议
latencies = [r.get('duration', 0) for r in test_results]
if latencies:
avg_latency = sum(latencies) / len(latencies)
if avg_latency > 2.0:
recommendations.append({
"priority": "medium",
"category": "performance",
"description": f"Average latency is {avg_latency:.2f}s. Consider performance optimizations.",
"action_items": [
"Profile test execution to identify bottlenecks",
"Optimize test data preparation",
"Consider parallel test execution"
]
})
return recommendations
# HTML报告模板示例
'''
<!DOCTYPE html>
<html>
<head>
<title>{{ title }}</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.header { background-color: #f0f0f0; padding: 20px; border-radius: 5px; }
.summary { display: flex; justify-content: space-around; margin: 20px 0; }
.metric { text-align: center; padding: 10px; }
.metric-value { font-size: 2em; font-weight: bold; color: #333; }
.metric-label { color: #666; }
.chart { margin: 20px 0; text-align: center; }
.recommendations { background-color: #fff3cd; padding: 15px; border-radius: 5px; margin: 20px 0; }
.recommendation { margin: 10px 0; padding: 10px; border-left: 4px solid #ffc107; }
</style>
</head>
<body>
<div class="header">
<h1>{{ title }}</h1>
<p>Generated at: {{ generated_at }}</p>
</div>
<div class="summary">
<div class="metric">
<div class="metric-value">{{ summary.total_tests }}</div>
<div class="metric-label">Total Tests</div>
</div>
<div class="metric">
<div class="metric-value">{{ "%.1f"|format(summary.pass_rate) }}%</div>
<div class="metric-label">Pass Rate</div>
</div>
<div class="metric">
<div class="metric-value">{{ "%.2f"|format(summary.avg_duration) }}s</div>
<div class="metric-label">Avg Duration</div>
</div>
</div>
<h2>Performance Charts</h2>
<div class="chart">
{% for chart in performance_charts %}
<img src="{{ chart }}" alt="Performance Chart" style="max-width: 100%; height: auto;">
{% endfor %}
</div>
<h2>Recommendations</h2>
<div class="recommendations">
{% for rec in recommendations %}
<div class="recommendation">
<strong>{{ rec.priority|upper }} - {{ rec.category|title }}</strong>
<p>{{ rec.description }}</p>
<ul>
{% for action in rec.action_items %}
<li>{{ action }}</li>
{% endfor %}
</ul>
</div>
{% endfor %}
</div>
</body>
</html>
'''
总结
测试环境部署是垂直领域大模型开发的关键环节。通过合理配置LLaMA-Factory和vLLM,结合完善的监控、日志和自动化测试体系,可以构建高效、可靠的测试环境。关键要点包括:
- 环境隔离:确保测试环境与生产环境完全隔离
- 自动化测试:建立覆盖功能、性能、安全性的全面测试体系
- 监控告警:实时监控系统状态和测试执行情况
- 数据管理:妥善管理测试数据的生命周期
- 持续集成:将测试集成到CI/CD流水线中
- 报告生成:自动生成详细的测试报告和分析
通过遵循这些最佳实践,可以确保垂直领域大模型在部署到生产环境之前得到充分的验证和优化。
更多推荐
所有评论(0)