山东大学项目实训(6)AI服务器的测试

单发测试

首先记录一个我一直使用的单个对话测试程序

import requests
import json
import time
from datetime import datetime

# 测试配置 - 修改这里的参数进行测试
################################################################
# 模型名称 - 根据实际使用的模型进行修改
# 注意:不同模型可能需要不同的配置和参数
# MODEL_NAME = "DeepSeek-R1"  # 模型名称,默认使用 DeepSeek-R1
# MODEL_NAME = "DeepSeek-V3"  # 测试时根据实际修改
# MODEL_NAME="doubao-thinking"
# MODEL_NAME="doubao-thinking-vision"
MODEL_NAME = "qwen-plus"
# MODEL_NAME = "doubao-fine"  # 使用 doubao-fine 模型1
USE_STREAM = True  # 流式模式
SERVER_URL = "http://localhost:5000"
TEST_MESSAGE =  [   
                # 图片信息,希望模型理解的图片
                {"type": "image_url", "image_url": {"url":  "https://ark-project.tos-cn-beijing.volces.com/doc_image/ark_demo_img_1.png"},},
                # 文本消息,希望模型根据图片信息回答的问题
                {"type": "text", "text": "支持输入是图片的模型系列是哪个?"}, 
            ]#只有"doubao-thinking-vision"模型可用这个模式
TEST_MESSAGE = "按字节编址的计算机中,某double型数组A的首地址为2000H,使用变址寻址和循环结构访问数组A,保存数组下标的变址寄存器初值为0,每次循环取一个数组元素,其偏移地址为变址值乘以sizeof(double),取完后变址寄存器内容自动加1。若某次循环所取元素的地址为2100H,则进入该次循环时变址寄存器的内容是___。\\nA. 25 \tB. 32 \tC. 64 \tD. 100"  # 测试消息内容
################################################################

def test_chat():
    """测试聊天端点 - 改进版流式输出"""
    try:
        print(f"[{datetime.now().strftime('%H:%M:%S')}] 开始测试")
        print(f"模型: {MODEL_NAME} | 流式: {'是' if USE_STREAM else '否'}")
        print(f"问题: {TEST_MESSAGE}")
        
        payload = {
            "message": TEST_MESSAGE,
            "model": MODEL_NAME,
            "stream": USE_STREAM,
            "singleTurn": True
        }
        
        # 记录请求开始时间
        start_time = time.time()
        print(f"[{datetime.now().strftime('%H:%M:%S')}] 请求发送中...")
        
        with requests.post(
            f"{SERVER_URL}/chat",
            json=payload,
            stream=USE_STREAM,
            timeout=30
        ) as response:
            # 流式处理
            if USE_STREAM:
                print(f"\n[{datetime.now().strftime('%H:%M:%S')}] 流式响应开始:")
                print("="*50)
                
                full_response = ""
                received_chunks = 0
                
                for line in response.iter_lines():
                    # 计算响应时长
                    elapsed = time.time() - start_time
                    
                    if line:
                        decoded_line = line.decode('utf-8').strip()
                        
                        if decoded_line.startswith('data:'):
                            try:
                                chunk = json.loads(decoded_line[5:])
                                if content := chunk.get('content'):
                                    # 添加时间戳显示
                                    timestamp = f"[+{elapsed:.2f}s] "
                                    print(timestamp + content, end='', flush=True)
                                    full_response += content
                                    received_chunks += 1
                            except json.JSONDecodeError:
                                print(f"\n[解析错误] 行内容: {decoded_line}")
                        
                        # 处理非数据行(如心跳)
                        elif decoded_line:
                            print(f"\n[额外数据] {decoded_line}")
                
                # 输出最终结果统计
                print("\n" + "="*50)
                print(f"[{datetime.now().strftime('%H:%M:%S')}] 流式响应结束")
                print(f"接收块数: {received_chunks} | 总时长: {elapsed:.2f}秒")
                print(f"完整响应: {full_response}")
                
            else:
                # 非流式处理
                data = response.json()
                elapsed = time.time() - start_time
                print(f"\n[{datetime.now().strftime('%H:%M:%S')}] 响应时长: {elapsed:.2f}秒")
                print(json.dumps(data, indent=2, ensure_ascii=False))
        
        print("\n测试完成!")
        return True
        
    except Exception as e:
        print(f"\n[错误] 测试失败: {str(e)}")
        return False

if __name__ == "__main__":
    test_chat()

以上代码返回的首个tokens响应时间按服务商和时间段不同散布在0.2s到3s之间(排除极端情况),平均tokens生成速率均在20tokens/s之上,可以说没有此方面的瓶颈。

并发测试

为了测试系统在高并发情况下的性能,我们使用 Python 的 concurrent.futures 模块进行并发测试。以下是并发测试的代码:

import requests
import time
from concurrent.futures import ThreadPoolExecutor

# 服务器配置
SERVER_URL = "http://localhost:5000"
TEST_MESSAGE = "请解释一下人工智能的概念"
MODEL_NAME = "DeepSeek-R1"
USE_STREAM = False
SINGLE_TURN = True
NUM_REQUESTS = 100  # 并发请求数量
MAX_WORKERS = 5  # 线程池最大工作线程数

def send_request():
    payload = {
        "message": TEST_MESSAGE,
        "model": MODEL_NAME,
        "stream": USE_STREAM,
        "singleTurn": SINGLE_TURN
    }
    try:
        start_time = time.time()
        response = requests.post(
            f"{SERVER_URL}/chat",
            json=payload,
            stream=USE_STREAM,
            timeout=30
        )
        end_time = time.time()
        elapsed_time = end_time - start_time
        if response.status_code == 200:
            return True, elapsed_time
        else:
            return False, elapsed_time
    except Exception as e:
        return False, 0

def main():
    success_count = 0
    total_time = 0
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = [executor.submit(send_request) for _ in range(NUM_REQUESTS)]
        for future in futures:
            success, elapsed_time = future.result()
            if success:
                success_count += 1
                total_time += elapsed_time

    success_rate = success_count / NUM_REQUESTS * 100
    average_time = total_time / success_count if success_count > 0 else 0

    print(f"并发请求数量: {NUM_REQUESTS}")
    print(f"成功率: {success_rate:.2f}%")
    print(f"平均响应时间: {average_time:.2f} 秒")

if __name__ == "__main__":
    main()

以上代码能很好地反馈服务器延迟并进行统计,有利于我估计大模型服务商和我的服务器在各个步骤的延迟。测试证明,本地的服务器延迟不超过1s,其中绝大部分延迟都是http请求带来的。

通过并发测试,我们可以观察系统在多个请求同时到达时的响应时间、吞吐量等性能指标,从而发现系统可能存在的瓶颈,并进行相应的优化。并发测试参考以上的单发测试代码和服务商的文档。

据我在不同时间的测试,我们学校的服务器并发量不超过4,且在瞬间涌入多发时会拒绝所有请求。别的服务商(如阿里,字节等)由于小组只有能力接入初级的api,并发量也不大。

不过,我的代码仍为并发请求做出了处理,在日后如果项目有足够的AI算力资源支持,我的服务器有不成为瓶颈 的保障。

Logo

更多推荐