山东大学项目实训(6)AI服务器的测试
山东大学项目实训记录AI服务器测试过程,包含单发和并发两种测试方式。单发测试使用Python脚本验证模型响应性能,支持流式和非流式输出,实测平均token生成速率超过20个/s。并发测试通过ThreadPoolExecutor模拟高负载场景,配置100个并发请求和5个工作线程,评估服务器在多用户访问时的稳定性。测试结果显示系统在首token响应时间和持续生成速度方面表现良好,未发现明显性能瓶颈。测
·
山东大学项目实训(6)AI服务器的测试
文章目录
单发测试
首先记录一个我一直使用的单个对话测试程序
import requests
import json
import time
from datetime import datetime
# 测试配置 - 修改这里的参数进行测试
################################################################
# 模型名称 - 根据实际使用的模型进行修改
# 注意:不同模型可能需要不同的配置和参数
# MODEL_NAME = "DeepSeek-R1" # 模型名称,默认使用 DeepSeek-R1
# MODEL_NAME = "DeepSeek-V3" # 测试时根据实际修改
# MODEL_NAME="doubao-thinking"
# MODEL_NAME="doubao-thinking-vision"
MODEL_NAME = "qwen-plus"
# MODEL_NAME = "doubao-fine" # 使用 doubao-fine 模型1
USE_STREAM = True # 流式模式
SERVER_URL = "http://localhost:5000"
TEST_MESSAGE = [
# 图片信息,希望模型理解的图片
{"type": "image_url", "image_url": {"url": "https://ark-project.tos-cn-beijing.volces.com/doc_image/ark_demo_img_1.png"},},
# 文本消息,希望模型根据图片信息回答的问题
{"type": "text", "text": "支持输入是图片的模型系列是哪个?"},
]#只有"doubao-thinking-vision"模型可用这个模式
TEST_MESSAGE = "按字节编址的计算机中,某double型数组A的首地址为2000H,使用变址寻址和循环结构访问数组A,保存数组下标的变址寄存器初值为0,每次循环取一个数组元素,其偏移地址为变址值乘以sizeof(double),取完后变址寄存器内容自动加1。若某次循环所取元素的地址为2100H,则进入该次循环时变址寄存器的内容是___。\\nA. 25 \tB. 32 \tC. 64 \tD. 100" # 测试消息内容
################################################################
def test_chat():
"""测试聊天端点 - 改进版流式输出"""
try:
print(f"[{datetime.now().strftime('%H:%M:%S')}] 开始测试")
print(f"模型: {MODEL_NAME} | 流式: {'是' if USE_STREAM else '否'}")
print(f"问题: {TEST_MESSAGE}")
payload = {
"message": TEST_MESSAGE,
"model": MODEL_NAME,
"stream": USE_STREAM,
"singleTurn": True
}
# 记录请求开始时间
start_time = time.time()
print(f"[{datetime.now().strftime('%H:%M:%S')}] 请求发送中...")
with requests.post(
f"{SERVER_URL}/chat",
json=payload,
stream=USE_STREAM,
timeout=30
) as response:
# 流式处理
if USE_STREAM:
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] 流式响应开始:")
print("="*50)
full_response = ""
received_chunks = 0
for line in response.iter_lines():
# 计算响应时长
elapsed = time.time() - start_time
if line:
decoded_line = line.decode('utf-8').strip()
if decoded_line.startswith('data:'):
try:
chunk = json.loads(decoded_line[5:])
if content := chunk.get('content'):
# 添加时间戳显示
timestamp = f"[+{elapsed:.2f}s] "
print(timestamp + content, end='', flush=True)
full_response += content
received_chunks += 1
except json.JSONDecodeError:
print(f"\n[解析错误] 行内容: {decoded_line}")
# 处理非数据行(如心跳)
elif decoded_line:
print(f"\n[额外数据] {decoded_line}")
# 输出最终结果统计
print("\n" + "="*50)
print(f"[{datetime.now().strftime('%H:%M:%S')}] 流式响应结束")
print(f"接收块数: {received_chunks} | 总时长: {elapsed:.2f}秒")
print(f"完整响应: {full_response}")
else:
# 非流式处理
data = response.json()
elapsed = time.time() - start_time
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] 响应时长: {elapsed:.2f}秒")
print(json.dumps(data, indent=2, ensure_ascii=False))
print("\n测试完成!")
return True
except Exception as e:
print(f"\n[错误] 测试失败: {str(e)}")
return False
if __name__ == "__main__":
test_chat()
以上代码返回的首个tokens响应时间按服务商和时间段不同散布在0.2s到3s之间(排除极端情况),平均tokens生成速率均在20tokens/s之上,可以说没有此方面的瓶颈。
并发测试
为了测试系统在高并发情况下的性能,我们使用 Python 的 concurrent.futures 模块进行并发测试。以下是并发测试的代码:
import requests
import time
from concurrent.futures import ThreadPoolExecutor
# 服务器配置
SERVER_URL = "http://localhost:5000"
TEST_MESSAGE = "请解释一下人工智能的概念"
MODEL_NAME = "DeepSeek-R1"
USE_STREAM = False
SINGLE_TURN = True
NUM_REQUESTS = 100 # 并发请求数量
MAX_WORKERS = 5 # 线程池最大工作线程数
def send_request():
payload = {
"message": TEST_MESSAGE,
"model": MODEL_NAME,
"stream": USE_STREAM,
"singleTurn": SINGLE_TURN
}
try:
start_time = time.time()
response = requests.post(
f"{SERVER_URL}/chat",
json=payload,
stream=USE_STREAM,
timeout=30
)
end_time = time.time()
elapsed_time = end_time - start_time
if response.status_code == 200:
return True, elapsed_time
else:
return False, elapsed_time
except Exception as e:
return False, 0
def main():
success_count = 0
total_time = 0
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(send_request) for _ in range(NUM_REQUESTS)]
for future in futures:
success, elapsed_time = future.result()
if success:
success_count += 1
total_time += elapsed_time
success_rate = success_count / NUM_REQUESTS * 100
average_time = total_time / success_count if success_count > 0 else 0
print(f"并发请求数量: {NUM_REQUESTS}")
print(f"成功率: {success_rate:.2f}%")
print(f"平均响应时间: {average_time:.2f} 秒")
if __name__ == "__main__":
main()
以上代码能很好地反馈服务器延迟并进行统计,有利于我估计大模型服务商和我的服务器在各个步骤的延迟。测试证明,本地的服务器延迟不超过1s,其中绝大部分延迟都是http请求带来的。
通过并发测试,我们可以观察系统在多个请求同时到达时的响应时间、吞吐量等性能指标,从而发现系统可能存在的瓶颈,并进行相应的优化。并发测试参考以上的单发测试代码和服务商的文档。
据我在不同时间的测试,我们学校的服务器并发量不超过4,且在瞬间涌入多发时会拒绝所有请求。别的服务商(如阿里,字节等)由于小组只有能力接入初级的api,并发量也不大。
不过,我的代码仍为并发请求做出了处理,在日后如果项目有足够的AI算力资源支持,我的服务器有不成为瓶颈 的保障。
更多推荐




所有评论(0)