13. 测试环境,垂直模型的如何部署?使用llamafactory、vLLM进行测试环境的模型部署

测试环境部署概述

测试环境的重要性

在垂直领域大模型的开发和部署过程中,测试环境扮演着至关重要的角色。它不仅是验证模型功能和性能的平台,也是进行A/B测试、压力测试和用户体验测试的基础设施。

1.1 测试环境 vs 生产环境
对比维度 测试环境 生产环境
资源配置 适中配置,成本敏感 高配置,性能优先
数据规模 小规模样本数据 全量真实数据
访问控制 相对宽松,便于调试 严格安全控制
容错能力 允许失败和重启 高可用性要求
监控粒度 详细调试信息 关键业务指标
更新频率 频繁更新迭代 稳定版本发布
1.2 测试环境的核心功能

功能验证

  • 模型推理正确性验证
  • API接口功能测试
  • 业务流程完整性验证
  • 异常情况处理测试

性能评估

  • 推理延迟测试
  • 并发处理能力评估
  • 资源利用率监控
  • 扩展性测试

用户体验测试

  • 响应质量评估
  • 交互流畅度测试
  • 多场景适配验证
  • 用户反馈收集

LLaMA-Factory测试部署

2.1 测试环境架构设计

架构组件

用户请求
API网关
负载均衡器
LLaMA-Factory实例1
LLaMA-Factory实例2
LLaMA-Factory实例3
模型存储
测试数据库
监控系统
日志收集

容器化部署

# Dockerfile.test
FROM python:3.10-slim

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    build-essential \
    curl \
    git \
    && rm -rf /var/lib/apt/lists/*

# 设置工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements-test.txt .
RUN pip install --no-cache-dir -r requirements-test.txt

# 安装LLaMA-Factory
RUN git clone https://github.com/hiyouga/LLaMA-Factory.git
WORKDIR /app/LLaMA-Factory
RUN pip install -e ".[torch,metrics]"

# 复制测试配置
COPY test_config.yaml /app/config.yaml
COPY test_models/ /app/models/

# 设置环境变量
ENV PYTHONPATH=/app
ENV CUDA_VISIBLE_DEVICES=0
ENV TEST_ENV=true

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["python", "-m", "uvicorn", "src.api:app", "--host", "0.0.0.0", "--port", "8000"]
2.2 测试配置管理

环境配置

# test_config.yaml
environment:
  name: "test"
  debug: true
  log_level: "DEBUG"

model:
  model_name_or_path: "/app/models/test_model"
  template: "llama2"
  torch_dtype: "float16"
  device_map: "auto"

server:
  host: "0.0.0.0"
  port: 8000
  workers: 2
  max_requests: 100

testing:
  enable_mock_responses: true
  response_delay: 0.1
  test_coverage_threshold: 0.8
  enable_detailed_logging: true

database:
  url: "sqlite:///test.db"
  echo: true

monitoring:
  enable_metrics: true
  metrics_port: 9090
  health_check_interval: 30

测试数据准备

# prepare_test_data.py
import json
import random
from datasets import load_dataset

def prepare_medical_test_data():
    """准备医疗测试数据"""
    
    # 加载医疗数据集
    dataset = load_dataset("medical_dialog", split="test[:1000]")
    
    test_cases = []
    
    for item in dataset:
        # 构建测试用例
        test_case = {
            "id": item.get("id", str(random.randint(1000, 9999))),
            "input": {
                "instruction": item["description"],
                "input": "",
                "history": []
            },
            "expected_output": {
                "response": item["answer"],
                "keywords": extract_medical_keywords(item["answer"]),
                "safety_level": "safe"
            },
            "metadata": {
                "domain": "medical",
                "difficulty": "medium",
                "test_type": "functional"
            }
        }
        
        test_cases.append(test_case)
    
    # 保存测试数据
    with open("medical_test_cases.json", "w", encoding="utf-8") as f:
        json.dump(test_cases, f, ensure_ascii=False, indent=2)
    
    print(f"Generated {len(test_cases)} medical test cases")
    return test_cases

def extract_medical_keywords(text):
    """提取医疗关键词"""
    medical_terms = [
        "symptom", "treatment", "diagnosis", "medication", "therapy",
        "disease", "condition", "prevention", "management", "care"
    ]
    
    text_lower = text.lower()
    keywords = [term for term in medical_terms if term in text_lower]
    
    return keywords

# 生成测试数据
test_data = prepare_medical_test_data()
2.3 自动化测试框架

测试用例设计

# test_llamafactory.py
import pytest
import asyncio
import aiohttp
from typing import Dict, List
import time

class TestLLaMAFactoryAPI:
    """LLaMA-Factory API测试类"""
    
    @pytest.fixture
    async def api_client(self):
        """API客户端fixture"""
        session = aiohttp.ClientSession()
        yield session
        await session.close()
    
    @pytest.fixture
    def base_url(self):
        """基础URL"""
        return "http://localhost:8000"
    
    @pytest.mark.asyncio
    async def test_health_endpoint(self, api_client, base_url):
        """测试健康检查端点"""
        async with api_client.get(f"{base_url}/health") as response:
            assert response.status == 200
            data = await response.json()
            assert data["status"] == "healthy"
    
    @pytest.mark.asyncio
    async def test_model_info(self, api_client, base_url):
        """测试模型信息端点"""
        async with api_client.get(f"{base_url}/model_info") as response:
            assert response.status == 200
            data = await response.json()
            assert "model_name" in data
            assert "quantization" in data
    
    @pytest.mark.asyncio
    async def test_single_inference(self, api_client, base_url):
        """测试单条推理"""
        payload = {
            "prompt": "What are the symptoms of diabetes?",
            "max_tokens": 100,
            "temperature": 0.7
        }
        
        async with api_client.post(f"{base_url}/generate", json=payload) as response:
            assert response.status == 200
            data = await response.json()
            assert "generated_text" in data
            assert len(data["generated_text"]) > 0
    
    @pytest.mark.asyncio
    async def test_batch_inference(self, api_client, base_url):
        """测试批量推理"""
        prompts = [
            "What is hypertension?",
            "How to prevent heart disease?",
            "What are diabetes symptoms?"
        ]
        
        tasks = []
        for prompt in prompts:
            payload = {
                "prompt": prompt,
                "max_tokens": 50,
                "temperature": 0.5
            }
            tasks.append(api_client.post(f"{base_url}/generate", json=payload))
        
        responses = await asyncio.gather(*tasks)
        
        for response in responses:
            assert response.status == 200
            data = await response.json()
            assert "generated_text" in data
    
    @pytest.mark.asyncio
    async def test_concurrent_requests(self, api_client, base_url):
        """测试并发请求处理"""
        num_concurrent = 10
        payload = {
            "prompt": "What is diabetes?",
            "max_tokens": 50,
            "temperature": 0.7
        }
        
        start_time = time.time()
        
        tasks = [
            api_client.post(f"{base_url}/generate", json=payload) 
            for _ in range(num_concurrent)
        ]
        
        responses = await asyncio.gather(*tasks)
        
        end_time = time.time()
        total_time = end_time - start_time
        
        # 验证所有请求成功
        for response in responses:
            assert response.status == 200
        
        # 验证响应时间合理
        avg_time_per_request = total_time / num_concurrent
        assert avg_time_per_request < 5.0  # 5秒阈值
        
        print(f"Concurrent requests: {num_concurrent}")
        print(f"Total time: {total_time:.2f}s")
        print(f"Average time per request: {avg_time_per_request:.2f}s")

class TestMedicalAccuracy:
    """医疗准确性测试"""
    
    @pytest.mark.parametrize("test_case", load_medical_test_cases())
    async def test_medical_accuracy(self, test_case, api_client, base_url):
        """测试医疗回答准确性"""
        
        payload = {
            "prompt": test_case["input"]["instruction"],
            "max_tokens": 150,
            "temperature": 0.3
        }
        
        async with api_client.post(f"{base_url}/generate", json=payload) as response:
            assert response.status == 200
            data = await response.json()
            
            generated_text = data["generated_text"]
            
            # 检查是否包含预期关键词
            expected_keywords = test_case["expected_output"]["keywords"]
            for keyword in expected_keywords:
                assert keyword.lower() in generated_text.lower(), \
                    f"Expected keyword '{keyword}' not found in generated text"
            
            # 安全检查
            assert "consult a doctor" in generated_text.lower() or \
                   "healthcare professional" in generated_text.lower() or \
                   "medical advice" in generated_text.lower(), \
                "Generated text should include medical disclaimer"

class TestPerformanceBenchmarks:
    """性能基准测试"""
    
    @pytest.mark.asyncio
    async def test_latency_benchmark(self, api_client, base_url):
        """延迟基准测试"""
        test_prompts = [
            "What is diabetes?",
            "How to treat hypertension?",
            "What are heart disease symptoms?"
        ]
        
        latencies = []
        
        for prompt in test_prompts:
            payload = {
                "prompt": prompt,
                "max_tokens": 100,
                "temperature": 0.7
            }
            
            start_time = time.time()
            
            async with api_client.post(f"{base_url}/generate", json=payload) as response:
                assert response.status == 200
                await response.json()
            
            end_time = time.time()
            latency = end_time - start_time
            latencies.append(latency)
        
        # 计算统计指标
        avg_latency = sum(latencies) / len(latencies)
        max_latency = max(latencies)
        min_latency = min(latencies)
        
        print(f"Average latency: {avg_latency:.3f}s")
        print(f"Max latency: {max_latency:.3f}s")
        print(f"Min latency: {min_latency:.3f}s")
        
        # 断言性能要求
        assert avg_latency < 3.0, f"Average latency {avg_latency}s exceeds 3s threshold"
        assert max_latency < 5.0, f"Max latency {max_latency}s exceeds 5s threshold"
    
    @pytest.mark.asyncio
    async def test_throughput_benchmark(self, api_client, base_url):
        """吞吐量基准测试"""
        duration = 10  # 测试持续时间(秒)
        start_time = time.time()
        end_time = start_time + duration
        
        completed_requests = 0
        payload = {
            "prompt": "What is diabetes?",
            "max_tokens": 50,
            "temperature": 0.5
        }
        
        while time.time() < end_time:
            async with api_client.post(f"{base_url}/generate", json=payload) as response:
                if response.status == 200:
                    completed_requests += 1
                    await response.json()
        
        throughput = completed_requests / duration
        
        print(f"Throughput: {throughput:.2f} requests/second")
        
        # 断言吞吐量要求
        assert throughput > 2.0, f"Throughput {throughput} req/s below 2.0 threshold"

vLLM测试部署

3.1 vLLM简介

vLLM核心特性

  • PagedAttention:高效的注意力机制实现
  • 连续批处理:动态批处理优化
  • CUDA图优化:减少内核启动开销
  • 量化支持:支持多种量化格式
  • 高吞吐量:专为高并发场景设计

性能对比

框架 吞吐量 延迟 内存效率 易用性
vLLM
HuggingFace
llama.cpp
TensorRT-LLM 极高 极低
3.2 vLLM安装配置

环境安装

# 安装vLLM
pip install vllm

# 验证安装
python -c "import vllm; print('vLLM installed successfully')"

# 安装额外依赖
pip install ray  # 分布式支持
pip install prometheus-client  # 监控支持

基础配置

# vllm_config.py
from vllm import LLM, SamplingParams
import torch

class VLLMTestConfig:
    """vLLM测试配置"""
    
    def __init__(self):
        self.model_name = "meta-llama/Llama-2-7b-hf"
        self.tensor_parallel_size = 1
        self.gpu_memory_utilization = 0.9
        self.max_model_len = 2048
        self.quantization = None  # "awq", "gptq", or None
        
    def create_llm_engine(self):
        """创建vLLM引擎"""
        llm = LLM(
            model=self.model_name,
            tensor_parallel_size=self.tensor_parallel_size,
            gpu_memory_utilization=self.gpu_memory_utilization,
            max_model_len=self.max_model_len,
            quantization=self.quantization,
            trust_remote_code=True,
            dtype=torch.float16
        )
        
        return llm
    
    def create_sampling_params(self, temperature=0.7, top_p=0.95, max_tokens=100):
        """创建采样参数"""
        return SamplingParams(
            temperature=temperature,
            top_p=top_p,
            max_tokens=max_tokens,
            repetition_penalty=1.1
        )
3.3 vLLM测试实现

基础推理测试

# vllm_basic_test.py
from vllm import LLM, SamplingParams
import time

def test_vllm_basic_inference():
    """测试vLLM基础推理"""
    
    # 创建LLM引擎
    llm = LLM(model="meta-llama/Llama-2-7b-hf")
    
    # 创建采样参数
    sampling_params = SamplingParams(
        temperature=0.7,
        top_p=0.95,
        max_tokens=100
    )
    
    # 测试提示
    prompts = [
        "What are the symptoms of diabetes?",
        "How to treat hypertension?",
        "What is normal blood pressure?"
    ]
    
    # 执行推理
    start_time = time.time()
    outputs = llm.generate(prompts, sampling_params)
    end_time = time.time()
    
    # 处理结果
    for i, output in enumerate(outputs):
        prompt = output.prompt
        generated_text = output.outputs[0].text
        
        print(f"Prompt {i+1}: {prompt}")
        print(f"Generated: {generated_text}")
        print(f"Tokens: {len(generated_text.split())}")
        print("-" * 50)
    
    total_time = end_time - start_time
    total_tokens = sum(len(output.outputs[0].text.split()) for output in outputs)
    
    print(f"Total time: {total_time:.3f}s")
    print(f"Total tokens: {total_tokens}")
    print(f"Tokens per second: {total_tokens/total_time:.2f}")

if __name__ == "__main__":
    test_vllm_basic_inference()

并发性能测试

# vllm_concurrent_test.py
import asyncio
import time
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
import aiohttp

class VLLMConcurrentTester:
    """vLLM并发测试器"""
    
    def __init__(self, model_name="meta-llama/Llama-2-7b-hf"):
        self.model_name = model_name
        self.engine = None
        
    async def initialize_engine(self):
        """初始化异步引擎"""
        engine_args = AsyncEngineArgs(
            model=self.model_name,
            tensor_parallel_size=1,
            gpu_memory_utilization=0.9,
            max_model_len=2048
        )
        
        self.engine = AsyncLLMEngine.from_engine_args(engine_args)
    
    async def generate_single(self, prompt, sampling_params):
        """生成单个回复"""
        request_id = f"req-{time.time()}"
        
        results = []
        async for request_output in self.engine.generate(
            prompt, sampling_params, request_id
        ):
            results.append(request_output)
        
        final_output = results[-1]
        return final_output.outputs[0].text
    
    async def concurrent_load_test(self, prompts, concurrency_level=10):
        """并发负载测试"""
        
        sampling_params = SamplingParams(
            temperature=0.7,
            top_p=0.95,
            max_tokens=100
        )
        
        # 创建并发任务
        tasks = []
        semaphore = asyncio.Semaphore(concurrency_level)
        
        async def bounded_generate(prompt):
            async with semaphore:
                return await self.generate_single(prompt, sampling_params)
        
        start_time = time.time()
        
        # 提交所有任务
        for prompt in prompts:
            tasks.append(bounded_generate(prompt))
        
        # 等待所有任务完成
        results = await asyncio.gather(*tasks)
        
        end_time = time.time()
        total_time = end_time - start_time
        
        # 统计结果
        total_tokens = sum(len(result.split()) for result in results)
        
        print(f"Concurrent load test completed:")
        print(f"Total prompts: {len(prompts)}")
        print(f"Concurrency level: {concurrency_level}")
        print(f"Total time: {total_time:.3f}s")
        print(f"Total tokens: {total_tokens}")
        print(f"Tokens per second: {total_tokens/total_time:.2f}")
        print(f"Average time per prompt: {total_time/len(prompts):.3f}s")
        
        return results
    
    async def stress_test(self, num_requests=100, concurrency_level=20):
        """压力测试"""
        
        # 生成测试提示
        test_prompts = [
            f"What is medical condition {i}? Explain symptoms and treatment."
            for i in range(num_requests)
        ]
        
        print(f"Starting stress test: {num_requests} requests, {concurrency_level} concurrent")
        
        results = await self.concurrent_load_test(
            test_prompts, 
            concurrency_level
        )
        
        # 验证所有请求成功
        success_count = sum(1 for result in results if result and len(result) > 0)
        success_rate = success_count / len(results) * 100
        
        print(f"Stress test results:")
        print(f"Success rate: {success_rate:.1f}%")
        print(f"Successful requests: {success_count}/{len(results)}")
        
        return success_rate

async def main():
    """主测试函数"""
    tester = VLLMConcurrentTester()
    
    # 初始化引擎
    await tester.initialize_engine()
    
    # 运行压力测试
    success_rate = await tester.stress_test(
        num_requests=50,
        concurrency_level=10
    )
    
    assert success_rate > 95, f"Success rate {success_rate}% below 95% threshold"

if __name__ == "__main__":
    asyncio.run(main())

医疗领域专项测试

# vllm_medical_test.py
from vllm import LLM, SamplingParams
import json

class MedicalVLLMTester:
    """医疗vLLM测试器"""
    
    def __init__(self, model_name="meta-llama/Llama-2-7b-hf"):
        self.llm = LLM(model=model_name)
        self.sampling_params = SamplingParams(
            temperature=0.3,  # 较低温度以获得更确定的回答
            top_p=0.9,
            max_tokens=150
        )
        
    def test_medical_accuracy(self, medical_questions):
        """测试医疗准确性"""
        
        results = []
        
        for question in medical_questions:
            print(f"Testing: {question}")
            
            # 生成回答
            outputs = self.llm.generate([question], self.sampling_params)
            generated_text = outputs[0].outputs[0].text.strip()
            
            # 评估质量
            quality_score = self.evaluate_medical_quality(question, generated_text)
            
            # 安全检查
            safety_check = self.check_medical_safety(generated_text)
            
            result = {
                "question": question,
                "generated_answer": generated_text,
                "quality_score": quality_score,
                "safety_check": safety_check,
                "passed": quality_score > 0.7 and safety_check["is_safe"]
            }
            
            results.append(result)
        
        # 统计结果
        total_tests = len(results)
        passed_tests = sum(1 for r in results if r["passed"])
        pass_rate = passed_tests / total_tests * 100
        
        print(f"\nMedical accuracy test results:")
        print(f"Total tests: {total_tests}")
        print(f"Passed tests: {passed_tests}")
        print(f"Pass rate: {pass_rate:.1f}%")
        
        return results
    
    def evaluate_medical_quality(self, question, answer):
        """评估医疗质量"""
        score = 0.0
        
        # 1. 相关性检查
        question_words = set(question.lower().split())
        answer_words = set(answer.lower().split())
        relevance = len(question_words & answer_words) / len(question_words)
        score += relevance * 0.3
        
        # 2. 完整性检查
        if len(answer) > 50:
            score += 0.2
        
        # 3. 专业术语检查
        medical_terms = [
            "symptom", "treatment", "diagnosis", "medication", "therapy",
            "condition", "disease", "prevention", "management"
        ]
        
        term_count = sum(1 for term in medical_terms if term in answer.lower())
        if term_count > 0:
            score += 0.3
        
        # 4. 结构清晰度检查
        if any(marker in answer for marker in [".", ",", ";"]):
            score += 0.2
        
        return min(score, 1.0)
    
    def check_medical_safety(self, answer):
        """检查医疗安全性"""
        safety_issues = []
        
        # 检查是否建议停止用药
        dangerous_patterns = [
            r"stop.*medication",
            r"discontinue.*treatment",
            r"no.*need.*doctor"
        ]
        
        for pattern in dangerous_patterns:
            if re.search(pattern, answer, re.IGNORECASE):
                safety_issues.append("Potentially dangerous advice detected")
        
        # 检查是否有免责声明
        disclaimer_patterns = [
            r"consult.*healthcare",
            r"medical.*professional",
            r"not.*medical.*advice"
        ]
        
        has_disclaimer = any(re.search(pattern, answer, re.IGNORECASE) 
                           for pattern in disclaimer_patterns)
        
        if not has_disclaimer:
            safety_issues.append("Missing medical disclaimer")
        
        return {
            "is_safe": len(safety_issues) == 0,
            "safety_issues": safety_issues
        }

# 医疗测试用例
medical_test_cases = [
    "What are the main symptoms of type 2 diabetes?",
    "How is hypertension typically diagnosed?",
    "What lifestyle changes help prevent heart disease?",
    "Explain the treatment options for asthma.",
    "What are the risk factors for stroke?",
    "How can I manage my blood pressure naturally?",
    "What should I do if I experience chest pain?",
    "Is it safe to stop taking blood pressure medication?"
]

# 运行医疗测试
def run_medical_tests():
    tester = MedicalVLLMTester()
    results = tester.test_medical_accuracy(medical_test_cases)
    
    # 保存结果
    with open("medical_vllm_test_results.json", "w") as f:
        json.dump(results, f, indent=2)
    
    print("Medical test results saved to medical_vllm_test_results.json")
    return results

if __name__ == "__main__":
    results = run_medical_tests()

测试环境监控

4.1 性能监控

实时监控指标

# monitoring.py
import psutil
import GPUtil
import time
from prometheus_client import Counter, Histogram, Gauge, start_http_server

class TestEnvironmentMonitor:
    """测试环境监控器"""
    
    def __init__(self, metrics_port=9090):
        # 初始化Prometheus指标
        self.request_count = Counter('llm_requests_total', 'Total LLM requests')
        self.request_duration = Histogram('llm_request_duration_seconds', 'Request duration')
        self.gpu_utilization = Gauge('gpu_utilization_percent', 'GPU utilization percentage')
        self.gpu_memory = Gauge('gpu_memory_used_mb', 'GPU memory used in MB')
        self.cpu_utilization = Gauge('cpu_utilization_percent', 'CPU utilization percentage')
        self.memory_usage = Gauge('memory_usage_percent', 'Memory usage percentage')
        
        # 启动指标服务器
        start_http_server(metrics_port)
        
    def record_request(self, duration):
        """记录请求指标"""
        self.request_count.inc()
        self.request_duration.observe(duration)
    
    def update_system_metrics(self):
        """更新系统指标"""
        # CPU使用率
        cpu_percent = psutil.cpu_percent(interval=1)
        self.cpu_utilization.set(cpu_percent)
        
        # 内存使用率
        memory = psutil.virtual_memory()
        self.memory_usage.set(memory.percent)
        
        # GPU指标
        try:
            gpus = GPUtil.getGPUs()
            if gpus:
                gpu = gpus[0]  # 假设使用第一个GPU
                self.gpu_utilization.set(gpu.load * 100)
                self.gpu_memory.set(gpu.memoryUsed)
        except:
            pass  # GPU监控失败时继续
    
    def start_monitoring(self, interval=30):
        """开始监控"""
        print(f"Starting monitoring on port {9090}")
        
        while True:
            self.update_system_metrics()
            time.sleep(interval)

# 使用示例
monitor = TestEnvironmentMonitor()
monitor.start_monitoring()

自定义监控面板

# dashboard.py
from flask import Flask, jsonify
import sqlite3
import json
from datetime import datetime, timedelta

app = Flask(__name__)

class TestDashboard:
    """测试仪表板"""
    
    def __init__(self, db_path="test_results.db"):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 创建测试结果表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS test_results (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                test_name TEXT,
                timestamp DATETIME,
                status TEXT,
                duration REAL,
                error_message TEXT,
                metrics TEXT
            )
        ''')
        
        # 创建性能指标表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS performance_metrics (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME,
                metric_name TEXT,
                metric_value REAL,
                tags TEXT
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def record_test_result(self, test_name, status, duration, error_message=None, metrics=None):
        """记录测试结果"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO test_results (test_name, timestamp, status, duration, error_message, metrics)
            VALUES (?, ?, ?, ?, ?, ?)
        ''', (test_name, datetime.now(), status, duration, error_message, json.dumps(metrics) if metrics else None))
        
        conn.commit()
        conn.close()
    
    def get_test_summary(self, hours=24):
        """获取测试摘要"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        since = datetime.now() - timedelta(hours=hours)
        
        # 获取总体统计
        cursor.execute('''
            SELECT 
                COUNT(*) as total_tests,
                SUM(CASE WHEN status = 'passed' THEN 1 ELSE 0 END) as passed_tests,
                SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_tests,
                AVG(duration) as avg_duration
            FROM test_results
            WHERE timestamp > ?
        ''', (since,))
        
        summary = cursor.fetchone()
        
        # 获取最近失败的测试
        cursor.execute('''
            SELECT test_name, timestamp, error_message
            FROM test_results
            WHERE status = 'failed' AND timestamp > ?
            ORDER BY timestamp DESC
            LIMIT 10
        ''', (since,))
        
        recent_failures = cursor.fetchall()
        
        conn.close()
        
        return {
            "summary": {
                "total_tests": summary[0],
                "passed_tests": summary[1],
                "failed_tests": summary[2],
                "pass_rate": (summary[1] / summary[0] * 100) if summary[0] > 0 else 0,
                "avg_duration": summary[3]
            },
            "recent_failures": [
                {
                    "test_name": failure[0],
                    "timestamp": failure[1],
                    "error_message": failure[2]
                }
                for failure in recent_failures
            ]
        }

@app.route('/api/dashboard/summary')
def dashboard_summary():
    """仪表板摘要API"""
    dashboard = TestDashboard()
    summary = dashboard.get_test_summary()
    return jsonify(summary)

@app.route('/api/dashboard/metrics')
def dashboard_metrics():
    """仪表板指标API"""
    # 这里可以添加更多指标查询逻辑
    return jsonify({
        "status": "active",
        "last_updated": datetime.now().isoformat()
    })

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)
4.2 日志管理

结构化日志

# logging_config.py
import logging
import json
from datetime import datetime
from pythonjsonlogger import jsonlogger

class StructuredLogger:
    """结构化日志记录器"""
    
    def __init__(self, name="test_logger"):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # JSON格式化器
        formatter = jsonlogger.JsonFormatter(
            '%(timestamp)s %(level)s %(name)s %(message)s'
        )
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        self.logger.addHandler(console_handler)
        
        # 文件处理器
        file_handler = logging.FileHandler('test_logs.jsonl')
        file_handler.setFormatter(formatter)
        self.logger.addHandler(file_handler)
    
    def log_test_start(self, test_name, parameters=None):
        """记录测试开始"""
        self.logger.info(
            "Test started",
            extra={
                'timestamp': datetime.now().isoformat(),
                'level': 'INFO',
                'test_name': test_name,
                'event_type': 'test_start',
                'parameters': parameters or {}
            }
        )
    
    def log_test_end(self, test_name, status, duration, metrics=None):
        """记录测试结束"""
        self.logger.info(
            "Test completed",
            extra={
                'timestamp': datetime.now().isoformat(),
                'level': 'INFO',
                'test_name': test_name,
                'event_type': 'test_end',
                'status': status,
                'duration': duration,
                'metrics': metrics or {}
            }
        )
    
    def log_error(self, test_name, error_message, error_type=None):
        """记录错误"""
        self.logger.error(
            "Test error",
            extra={
                'timestamp': datetime.now().isoformat(),
                'level': 'ERROR',
                'test_name': test_name,
                'event_type': 'error',
                'error_message': error_message,
                'error_type': error_type
            }
        )
    
    def log_performance_metrics(self, metrics):
        """记录性能指标"""
        self.logger.info(
            "Performance metrics",
            extra={
                'timestamp': datetime.now().isoformat(),
                'level': 'INFO',
                'event_type': 'performance_metrics',
                'metrics': metrics
            }
        )

# 使用示例
logger = StructuredLogger()

# 记录测试开始
logger.log_test_start("medical_accuracy_test", {"model": "llama2-7b", "dataset": "medical_dialog"})

# 记录性能指标
logger.log_performance_metrics({
    "throughput": 15.2,
    "latency_p50": 0.8,
    "latency_p95": 1.5,
    "gpu_utilization": 85.5
})

# 记录测试结束
logger.log_test_end("medical_accuracy_test", "passed", 120.5, {"accuracy": 0.87})

日志分析工具

# log_analyzer.py
import pandas as pd
import json
from datetime import datetime

class LogAnalyzer:
    """日志分析器"""
    
    def __init__(self, log_file):
        self.log_file = log_file
        self.df = self.load_logs()
    
    def load_logs(self):
        """加载日志文件"""
        logs = []
        
        with open(self.log_file, 'r') as f:
            for line in f:
                try:
                    log_entry = json.loads(line)
                    logs.append(log_entry)
                except json.JSONDecodeError:
                    continue
        
        return pd.DataFrame(logs)
    
    def analyze_test_performance(self, test_name):
        """分析测试性能"""
        test_logs = self.df[
            (self.df['test_name'] == test_name) & 
            (self.df['event_type'] == 'test_end')
        ]
        
        if test_logs.empty:
            return None
        
        analysis = {
            "total_runs": len(test_logs),
            "pass_rate": (test_logs['status'] == 'passed').mean() * 100,
            "avg_duration": test_logs['duration'].mean(),
            "min_duration": test_logs['duration'].min(),
            "max_duration": test_logs['duration'].max(),
            "duration_std": test_logs['duration'].std()
        }
        
        # 时间趋势分析
        test_logs['timestamp'] = pd.to_datetime(test_logs['timestamp'])
        test_logs = test_logs.sort_values('timestamp')
        
        # 计算滚动平均
        if len(test_logs) > 5:
            test_logs['rolling_avg'] = test_logs['duration'].rolling(window=5).mean()
            analysis['latest_trend'] = "improving" if test_logs['rolling_avg'].iloc[-1] < test_logs['rolling_avg'].iloc[-5] else "degrading"
        
        return analysis
    
    def generate_performance_report(self):
        """生成性能报告"""
        report = {
            "generated_at": datetime.now().isoformat(),
            "summary": self.generate_summary(),
            "top_slow_tests": self.get_slowest_tests(),
            "error_analysis": self.analyze_errors(),
            "recommendations": self.generate_recommendations()
        }
        
        return report
    
    def generate_summary(self):
        """生成摘要"""
        total_tests = len(self.df[self.df['event_type'] == 'test_end'])
        passed_tests = len(self.df[(self.df['event_type'] == 'test_end') & (self.df['status'] == 'passed')])
        
        return {
            "total_test_runs": total_tests,
            "overall_pass_rate": (passed_tests / total_tests * 100) if total_tests > 0 else 0,
            "total_errors": len(self.df[self.df['level'] == 'ERROR']),
            "time_range": {
                "start": self.df['timestamp'].min(),
                "end": self.df['timestamp'].max()
            }
        }
    
    def get_slowest_tests(self, top_n=10):
        """获取最慢的测试"""
        slow_tests = self.df[
            (self.df['event_type'] == 'test_end') & 
            (self.df['status'] == 'passed')
        ].nlargest(top_n, 'duration')
        
        return slow_tests[['test_name', 'duration', 'timestamp']].to_dict('records')
    
    def analyze_errors(self):
        """分析错误"""
        error_logs = self.df[self.df['level'] == 'ERROR']
        
        if error_logs.empty:
            return {"no_errors": True}
        
        error_analysis = {
            "total_errors": len(error_logs),
            "error_types": error_logs['error_type'].value_counts().to_dict(),
            "most_frequent_errors": error_logs['error_message'].value_counts().head(5).to_dict()
        }
        
        return error_analysis
    
    def generate_recommendations(self):
        """生成改进建议"""
        recommendations = []
        
        # 基于性能的建议
        slow_tests = self.get_slowest_tests(5)
        if slow_tests:
            recommendations.append({
                "type": "performance",
                "priority": "high",
                "description": f"Optimize the following slow tests: {[t['test_name'] for t in slow_tests[:3]]}"
            })
        
        # 基于错误率的建议
        error_rate = len(self.df[self.df['level'] == 'ERROR']) / len(self.df)
        if error_rate > 0.1:  # 10%错误率
            recommendations.append({
                "type": "reliability",
                "priority": "high",
                "description": "High error rate detected. Review test stability and error handling."
            })
        
        return recommendations

# 使用示例
analyzer = LogAnalyzer("test_logs.jsonl")
report = analyzer.generate_performance_report()

# 保存报告
with open("performance_report.json", "w") as f:
    json.dump(report, f, indent=2)

print("Performance report generated: performance_report.json")

测试环境最佳实践

5.1 环境隔离

命名空间隔离

# kubernetes_namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: llm-testing
  labels:
    environment: testing
    purpose: llm-evaluation
---
apiVersion: v1
kind: ResourceQuota
metadata:
  name: llm-testing-quota
  namespace: llm-testing
spec:
  hard:
    requests.cpu: "16"
    requests.memory: 64Gi
    limits.cpu: "32"
    limits.memory: 128Gi
    nvidia.com/gpu: "4"

网络隔离

# network_policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: llm-testing-network-policy
  namespace: llm-testing
spec:
  podSelector: {}
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - namespaceSelector:
        matchLabels:
          name: llm-testing
    - namespaceSelector:
        matchLabels:
          name: monitoring
  egress:
  - to:
    - namespaceSelector:
        matchLabels:
          name: llm-testing
  - to:
    - namespaceSelector:
        matchLabels:
          name: kube-system
  - to:
    - namespaceSelector:
        matchLabels:
          name: monitoring
5.2 数据管理

测试数据生命周期

# test_data_lifecycle.py
class TestDataManager:
    """测试数据生命周期管理器"""
    
    def __init__(self, storage_backend="s3"):
        self.storage_backend = storage_backend
        self.data_retention_days = 30
        
    def create_test_dataset(self, dataset_config):
        """创建测试数据集"""
        dataset_id = f"test_dataset_{int(time.time())}"
        
        # 生成合成数据
        synthetic_data = self.generate_synthetic_data(dataset_config)
        
        # 添加噪声和边界情况
        augmented_data = self.augment_test_data(synthetic_data)
        
        # 保存数据集
        self.save_dataset(dataset_id, augmented_data)
        
        return dataset_id
    
    def generate_synthetic_data(self, config):
        """生成合成测试数据"""
        synthetic_data = []
        
        for i in range(config["num_samples"]):
            sample = {
                "id": i,
                "input": self.generate_medical_query(),
                "expected_output": self.generate_medical_response(),
                "metadata": {
                    "difficulty": random.choice(["easy", "medium", "hard"]),
                    "domain": config.get("domain", "general"),
                    "test_type": config.get("test_type", "functional")
                }
            }
            synthetic_data.append(sample)
        
        return synthetic_data
    
    def augment_test_data(self, data):
        """增强测试数据"""
        augmented_data = []
        
        for sample in data:
            # 原始样本
            augmented_data.append(sample)
            
            # 添加噪声版本
            noisy_sample = self.add_noise(sample)
            augmented_data.append(noisy_sample)
            
            # 添加边界情况
            edge_case = self.create_edge_case(sample)
            if edge_case:
                augmented_data.append(edge_case)
        
        return augmented_data
    
    def cleanup_old_datasets(self):
        """清理旧数据集"""
        cutoff_date = datetime.now() - timedelta(days=self.data_retention_days)
        
        old_datasets = self.find_datasets_older_than(cutoff_date)
        
        for dataset_id in old_datasets:
            self.delete_dataset(dataset_id)
            print(f"Deleted old test dataset: {dataset_id}")
    
    def anonymize_sensitive_data(self, data):
        """匿名化敏感数据"""
        anonymized_data = []
        
        for item in data:
            # 移除个人识别信息
            anonymized_item = self.remove_pii(item)
            
            # 泛化具体数值
            anonymized_item = self.generalize_values(anonymized_item)
            
            anonymized_data.append(anonymized_item)
        
        return anonymized_data
5.3 自动化测试流水线

CI/CD集成

# .gitlab-ci.yml 或 .github/workflows/test.yml
name: LLM Test Pipeline

on:
  push:
    branches: [ develop, feature/* ]
  pull_request:
    branches: [ main ]

jobs:
  unit-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'
      
      - name: Install dependencies
        run: |
          pip install -r requirements-test.txt
          pip install -e .
      
      - name: Run unit tests
        run: |
          pytest tests/unit/ -v --cov=src --cov-report=xml
      
      - name: Upload coverage
        uses: codecov/codecov-action@v3
        with:
          file: ./coverage.xml

  integration-tests:
    runs-on: ubuntu-latest
    needs: unit-tests
    services:
      postgres:
        image: postgres:14
        env:
          POSTGRES_PASSWORD: testpass
          POSTGRES_DB: testdb
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
    
    steps:
      - uses: actions/checkout@v3
      
      - name: Set up test environment
        run: |
          docker-compose -f docker-compose.test.yml up -d
      
      - name: Wait for services
        run: |
          ./scripts/wait-for-services.sh
      
      - name: Run integration tests
        run: |
          pytest tests/integration/ -v --tb=short
      
      - name: Cleanup
        if: always()
        run: |
          docker-compose -f docker-compose.test.yml down

  performance-tests:
    runs-on: [self-hosted, gpu]
    needs: integration-tests
    steps:
      - uses: actions/checkout@v3
      
      - name: Download test model
        run: |
          ./scripts/download-test-model.sh
      
      - name: Run performance tests
        run: |
          pytest tests/performance/ -v --benchmark-json=benchmark.json
      
      - name: Upload benchmark results
        uses: actions/upload-artifact@v3
        with:
          name: benchmark-results
          path: benchmark.json

  security-scan:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Run security scan
        uses: securecodewarrior/github-action-add-sarif@v1
        with:
          sarif-file: security-scan.sarif
      
      - name: Check for vulnerabilities
        run: |
          safety check
          bandit -r src/ -f json -o bandit-report.json

  deploy-test-env:
    runs-on: ubuntu-latest
    needs: [unit-tests, integration-tests, security-scan]
    if: github.ref == 'refs/heads/develop'
    steps:
      - uses: actions/checkout@v3
      
      - name: Deploy to test environment
        run: |
          ./scripts/deploy-test-env.sh
          
      - name: Run smoke tests
        run: |
          ./scripts/smoke-test.sh
          
      - name: Notify team
        run: |
          ./scripts/notify-test-deployment.sh

测试报告生成

# test_report_generator.py
import jinja2
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns

class TestReportGenerator:
    """测试报告生成器"""
    
    def __init__(self, template_dir="templates"):
        self.template_env = jinja2.Environment(
            loader=jinja2.FileSystemLoader(template_dir)
        )
    
    def generate_html_report(self, test_results, output_path):
        """生成HTML测试报告"""
        
        # 准备报告数据
        report_data = {
            "title": "LLM Test Environment Report",
            "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "summary": self.generate_summary(test_results),
            "test_details": self.prepare_test_details(test_results),
            "performance_charts": self.generate_performance_charts(test_results),
            "trend_analysis": self.analyze_trends(test_results),
            "recommendations": self.generate_recommendations(test_results)
        }
        
        # 渲染模板
        template = self.template_env.get_template('test_report.html')
        html_content = template.render(**report_data)
        
        # 保存报告
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(html_content)
        
        return output_path
    
    def generate_performance_charts(self, test_results):
        """生成性能图表"""
        charts = []
        
        # 延迟分布图
        latencies = [r['duration'] for r in test_results if r.get('duration')]
        if latencies:
            plt.figure(figsize=(10, 6))
            plt.hist(latencies, bins=20, alpha=0.7, color='skyblue', edgecolor='black')
            plt.xlabel('Latency (seconds)')
            plt.ylabel('Frequency')
            plt.title('Request Latency Distribution')
            plt.grid(axis='y', alpha=0.3)
            
            chart_path = 'latency_distribution.png'
            plt.savefig(chart_path)
            plt.close()
            charts.append(chart_path)
        
        # 吞吐量趋势图
        timestamps = [r['timestamp'] for r in test_results]
        throughputs = [r.get('throughput', 0) for r in test_results]
        
        if timestamps and throughputs:
            plt.figure(figsize=(12, 6))
            plt.plot(timestamps, throughputs, marker='o', linewidth=2, markersize=4)
            plt.xlabel('Time')
            plt.ylabel('Throughput (requests/second)')
            plt.title('Throughput Trend')
            plt.xticks(rotation=45)
            plt.grid(True, alpha=0.3)
            
            chart_path = 'throughput_trend.png'
            plt.savefig(chart_path)
            plt.close()
            charts.append(chart_path)
        
        return charts
    
    def analyze_trends(self, test_results):
        """分析趋势"""
        df = pd.DataFrame(test_results)
        
        if df.empty:
            return {"message": "No data available for trend analysis"}
        
        # 按时间排序
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df = df.sort_values('timestamp')
        
        trends = {
            "pass_rate_trend": self.calculate_pass_rate_trend(df),
            "performance_trend": self.calculate_performance_trend(df),
            "error_trend": self.calculate_error_trend(df)
        }
        
        return trends
    
    def generate_recommendations(self, test_results):
        """生成改进建议"""
        recommendations = []
        
        # 基于通过率的建议
        pass_rate = sum(1 for r in test_results if r.get('status') == 'passed') / len(test_results)
        if pass_rate < 0.9:
            recommendations.append({
                "priority": "high",
                "category": "reliability",
                "description": f"Pass rate is {pass_rate:.1%}. Review failing tests and improve test stability.",
                "action_items": [
                    "Investigate root causes of test failures",
                    "Implement retry mechanisms for flaky tests",
                    "Improve error handling in test code"
                ]
            })
        
        # 基于性能的建议
        latencies = [r.get('duration', 0) for r in test_results]
        if latencies:
            avg_latency = sum(latencies) / len(latencies)
            if avg_latency > 2.0:
                recommendations.append({
                    "priority": "medium",
                    "category": "performance",
                    "description": f"Average latency is {avg_latency:.2f}s. Consider performance optimizations.",
                    "action_items": [
                        "Profile test execution to identify bottlenecks",
                        "Optimize test data preparation",
                        "Consider parallel test execution"
                    ]
                })
        
        return recommendations

# HTML报告模板示例
'''
<!DOCTYPE html>
<html>
<head>
    <title>{{ title }}</title>
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; }
        .header { background-color: #f0f0f0; padding: 20px; border-radius: 5px; }
        .summary { display: flex; justify-content: space-around; margin: 20px 0; }
        .metric { text-align: center; padding: 10px; }
        .metric-value { font-size: 2em; font-weight: bold; color: #333; }
        .metric-label { color: #666; }
        .chart { margin: 20px 0; text-align: center; }
        .recommendations { background-color: #fff3cd; padding: 15px; border-radius: 5px; margin: 20px 0; }
        .recommendation { margin: 10px 0; padding: 10px; border-left: 4px solid #ffc107; }
    </style>
</head>
<body>
    <div class="header">
        <h1>{{ title }}</h1>
        <p>Generated at: {{ generated_at }}</p>
    </div>
    
    <div class="summary">
        <div class="metric">
            <div class="metric-value">{{ summary.total_tests }}</div>
            <div class="metric-label">Total Tests</div>
        </div>
        <div class="metric">
            <div class="metric-value">{{ "%.1f"|format(summary.pass_rate) }}%</div>
            <div class="metric-label">Pass Rate</div>
        </div>
        <div class="metric">
            <div class="metric-value">{{ "%.2f"|format(summary.avg_duration) }}s</div>
            <div class="metric-label">Avg Duration</div>
        </div>
    </div>
    
    <h2>Performance Charts</h2>
    <div class="chart">
        {% for chart in performance_charts %}
        <img src="{{ chart }}" alt="Performance Chart" style="max-width: 100%; height: auto;">
        {% endfor %}
    </div>
    
    <h2>Recommendations</h2>
    <div class="recommendations">
        {% for rec in recommendations %}
        <div class="recommendation">
            <strong>{{ rec.priority|upper }} - {{ rec.category|title }}</strong>
            <p>{{ rec.description }}</p>
            <ul>
                {% for action in rec.action_items %}
                <li>{{ action }}</li>
                {% endfor %}
            </ul>
        </div>
        {% endfor %}
    </div>
</body>
</html>
'''

总结

测试环境部署是垂直领域大模型开发的关键环节。通过合理配置LLaMA-Factory和vLLM,结合完善的监控、日志和自动化测试体系,可以构建高效、可靠的测试环境。关键要点包括:

  1. 环境隔离:确保测试环境与生产环境完全隔离
  2. 自动化测试:建立覆盖功能、性能、安全性的全面测试体系
  3. 监控告警:实时监控系统状态和测试执行情况
  4. 数据管理:妥善管理测试数据的生命周期
  5. 持续集成:将测试集成到CI/CD流水线中
  6. 报告生成:自动生成详细的测试报告和分析

通过遵循这些最佳实践,可以确保垂直领域大模型在部署到生产环境之前得到充分的验证和优化。

Logo

更多推荐