1. 项目概述:当AI智能体遇上性能压测

最近在折腾AI智能体(Agent)项目,从简单的客服机器人到复杂的自动化工作流,团队里好几个项目都上线了。功能跑通大家都很兴奋,但老板随口问了一句:“这玩意儿能抗住多少用户同时用?” 会议室瞬间安静了。确实,AI智能体不再是简单的API调用,它背后往往串联着大模型、工具调用、记忆存储、逻辑判断等一系列复杂操作,一个用户请求可能触发几十次内部交互。传统的接口性能测试工具,像JMeter,对付这种状态复杂、异步回调多的智能体,有点力不从心。于是,我把目光投向了 Locust ——一个用Python写的开源负载测试工具。它用代码定义用户行为,天生适合模拟智能体那种“思考-行动-观察”的复杂流程。这篇文章,我就结合最近给一个“智能数据分析Agent”做压测的实战,聊聊怎么用Locust给AI智能体“上强度”,看看它在压力下到底表现如何。

简单说,这个测试的目标很明确:模拟大量虚拟用户同时与我们的智能体交互,观察其响应时间、吞吐量、错误率等关键指标,找到性能瓶颈。这适合所有正在开发或已经部署了AI智能体的开发者、测试工程师和运维同学。无论你是用LangChain、AutoGen、Dify还是扣子(Coze)平台搭建的智能体,只要它能通过HTTP或WebSocket对外提供服务,这套方法就能帮你摸清它的“底细”。

2. 为什么是Locust?AI智能体性能测试的特殊性

在聊具体操作之前,得先搞清楚,为什么给AI智能体做性能测试不能简单套用传统Web接口那套。AI智能体,比如一个能帮你查数据、做图表、写总结的Agent,它的工作流(Workflow)通常是异步、多步且有状态的。

传统接口测试的局限 :像JMeter这类工具,擅长模拟对静态URL发起HTTP请求,然后断言响应。但智能体的交互往往是“会话式”的。用户说“帮我分析上周销售数据”,智能体可能先调用工具查询数据库,再把结果扔给大模型生成洞察,最后可能还会调用另一个工具生成图表。这个过程涉及多次内部API调用和可能的长时等待(等大模型生成)。JMeter要模拟这种逻辑链,需要写大量复杂的逻辑控制器和后置处理器,脚本维护成本高,而且对异步回调的支持不够直观。

Locust的优势所在 :Locust的核心哲学是“用代码定义用户行为”。测试场景就是一个Python脚本,你可以在里面自由地使用 requests aiohttp websockets 等任何Python库,像写普通业务逻辑一样编写虚拟用户(User)的行为。这对于AI智能体测试简直是绝配:

  1. 逻辑编排自由 :你可以轻松实现“发送消息 -> 等待并轮询结果 -> 根据结果内容决定下一步”这种复杂流程。
  2. 处理异步与长轮询 :智能体处理复杂任务时,服务端可能先返回一个任务ID,客户端需要轮询另一个接口获取最终结果。用Python的 time.sleep 配合循环判断,在Locust里很容易实现。
  3. 状态保持 :一个虚拟用户(User)实例在整个运行期间都存在,可以很方便地在 self (即User实例)上存储会话ID、任务ID、上下文等状态信息,模拟真实用户的连续对话。
  4. 分布式与扩展性 :Locust原生支持分布式运行,用一台Master机协调多台Worker机产生海量并发,轻松模拟上万甚至数十万用户,这对评估智能体服务的扩容能力至关重要。

注意 :选择Locust也意味着测试团队需要具备一定的Python编程能力。但这对于AI智能体开发团队来说通常不是问题,甚至可以利用已有的业务逻辑代码来快速构建测试脚本。

3. 测试环境与目标智能体定义

在开始写压测脚本前,必须明确测试对象和测试环境。我这次的目标是一个内部开发的“智能数据分析Agent”,它的架构很典型:

  • 接口 :提供一个HTTP POST接口 /v1/chat/completions ,接收用户查询(自然语言)。
  • 内部流程 :接收到查询后,Agent会先进行意图识别,然后可能顺序或并行调用:1)SQL查询服务;2)大模型服务(用于数据解读);3)图表生成服务。
  • 响应格式 :由于处理耗时较长,服务采用异步响应。立即返回一个 {"task_id": "xxx", "status": "processing"} ,客户端需要通过另一个GET接口 /v1/tasks/{task_id} 轮询获取最终结果。

我们的性能测试目标

  • 核心指标
    • 响应时间 :从发送请求到收到最终成功响应的平均时间、95分位时间(P95)。
    • 吞吐量(RPS) :每秒系统能成功处理的完整用户请求数。
    • 错误率 :任务失败(如超时、内部错误)的比例。
    • 资源监控 :服务所在服务器的CPU、内存、GPU(如果用到)使用率。
  • 测试场景
    • 基准测试 :低并发(如10个用户),验证脚本和流程是否正确,获取基线性能数据。
    • 负载测试 :逐步增加并发用户数(如50, 100, 200),观察系统性能变化,找到最佳并发点。
    • 压力测试 :在负载测试确定的临界点之上,继续增加压力,直到系统出现错误率飙升或响应时间不可接受,找到系统瓶颈。

环境准备

  1. Locust安装 pip install locust
  2. 测试机 :需要与生产环境网络互通。如果模拟高并发,建议测试机本身配置不能太差,避免成为瓶颈。我使用了一台4核8G的云服务器。
  3. 监控工具 :除了Locust自带的Web UI报表,还需要监控服务端资源。我用的是 Prometheus + Grafana (服务端已集成),也可以简单使用 htop nvidia-smi (针对GPU)在服务器上直接观察。

4. 编写Locust测试脚本:模拟智能体交互

这是最核心的一步。我们创建一个名为 ai_agent_load_test.py 的文件。

4.1 定义用户行为与任务

Locust的核心是定义 HttpUser 类,并在其中通过 @task 装饰器定义任务。

from locust import HttpUser, task, between, events
import time
import json
import logging

class AIAgentUser(HttpUser):
    # 用户在每个任务执行后,等待1-3秒的随机时间,模拟用户思考间隔
    wait_time = between(1, 3)

    def on_start(self):
        """模拟用户开始一个会话,可以进行一些初始化,比如登录(如果需要)"""
        self.task_id = None
        self.query_list = [
            "请总结上周的销售额趋势",
            "对比一下华东区和华北区本季度的利润率",
            "生成一份关于产品A用户反馈的摘要报告"
        ]
        logging.info("虚拟用户启动,准备发起请求。")

    @task(weight=3) # weight表示任务权重,权重越高,被执行的频率越高
    def test_complex_analysis(self):
        """测试一个复杂的分析任务,涉及多步异步处理"""
        query = self.query_list[0] # 简单起见,这里固定用第一个查询
        payload = {
            "message": query,
            "stream": False
        }
        headers = {"Content-Type": "application/json"}

        # 1. 发起初始请求
        with self.client.post("/v1/chat/completions", json=payload, headers=headers, catch_response=True) as init_response:
            if init_response.status_code != 200:
                init_response.failure(f"初始请求失败: {init_response.status_code}")
                return
            try:
                data = init_response.json()
                self.task_id = data.get("task_id")
                if not self.task_id:
                    init_response.failure("响应中未找到task_id")
                    return
                init_response.success()
            except json.JSONDecodeError:
                init_response.failure("响应不是有效的JSON")
                return

        # 2. 轮询任务结果,最多轮询10次,每次间隔2秒
        max_polls = 10
        for i in range(max_polls):
            time.sleep(2) # 轮询间隔,模拟等待
            with self.client.get(f"/v1/tasks/{self.task_id}", catch_response=True) as poll_response:
                if poll_response.status_code != 200:
                    poll_response.failure(f"轮询请求失败: {poll_response.status_code}")
                    continue # 或者直接失败,取决于业务逻辑

                data = poll_response.json()
                status = data.get("status")
                if status == "completed":
                    # 检查最终结果是否有效
                    result = data.get("result")
                    if result and len(result) > 10: # 简单有效性检查
                        poll_response.success()
                        logging.info(f"任务 {self.task_id} 完成,结果长度: {len(result)}")
                    else:
                        poll_response.failure("任务完成但结果无效")
                    break # 任务完成,退出轮询
                elif status == "failed":
                    poll_response.failure(f"任务处理失败: {data.get('error', '未知错误')}")
                    break
                elif status == "processing":
                    poll_response.success() # 轮询本身是成功的
                    if i == max_polls - 1:
                        # 达到最大轮询次数仍未完成
                        poll_response.failure("任务处理超时")
                else:
                    poll_response.failure(f"未知的任务状态: {status}")
                    break
        else:
            # 如果循环正常结束(非break),说明超时
            self.environment.runner.stats.log_error("GET", f"/v1/tasks/{self.task_id}")
            logging.error(f"任务 {self.task_id} 轮询超时。")

    @task(weight=1)
    def test_simple_query(self):
        """测试一个简单的、期望快速返回的查询"""
        payload = {
            "message": "当前时间是什么?",
            "stream": False
        }
        with self.client.post("/v1/chat/completions", json=payload, catch_response=True) as response:
            # 对于简单查询,我们假设它应该同步快速返回,不涉及异步任务
            if response.status_code == 200:
                data = response.json()
                # 这里根据实际业务逻辑判断成功,例如直接返回了答案
                if data.get("content"):
                    response.success()
                else:
                    response.failure("响应内容为空")
            else:
                response.failure(f"请求失败: {response.status_code}")

脚本关键点解析

  • catch_response=True :这是关键参数,它允许我们手动控制请求的成功与失败,而不是仅依赖HTTP状态码。对于轮询接口,即使返回200,但任务状态是 processing ,我们也不应将其计为“业务成功”。
  • 灵活的断言 :在 with 语句块内,我们可以对响应内容进行任意复杂的检查( response.json() ),并调用 response.success() response.failure() 来告知Locust统计结果。这完美契合了智能体业务逻辑验证的需求。
  • 状态保持 self.task_id 存储在User实例上,在同一个用户的不同任务步骤间传递。
  • 多任务权重 :通过 @task(weight=3) @task(weight=1) ,我们可以模拟用户行为的不同比例。例如,复杂分析任务可能占70%,简单查询占30%。

4.2 添加测试数据与钩子

为了让测试更真实,我们可以从文件读取更多样的查询语句,并使用Locust的事件钩子。

import csv
from locust import events
from locust.runners import MasterRunner

# 读取测试数据
def load_test_queries():
    queries = []
    # 假设我们有一个 queries.csv 文件,里面有很多不同的用户问题
    try:
        with open('queries.csv', 'r', encoding='utf-8') as f:
            reader = csv.reader(f)
            for row in reader:
                if row:
                    queries.append(row[0])
    except FileNotFoundError:
        # 如果文件不存在,使用默认数据
        queries = ["查询销售额", "分析用户增长", "预测下周趋势", "生成月度报告"]
    return queries

# 存储全局测试数据
test_queries = load_test_queries()

class AIAgentUser(HttpUser):
    wait_time = between(1, 3)

    def on_start(self):
        # 每个虚拟用户随机分配一些查询,增加多样性
        import random
        self.my_queries = random.sample(test_queries, min(3, len(test_queries)))
        self.current_query_index = 0

    def get_next_query(self):
        """轮询获取下一个查询语句"""
        query = self.my_queries[self.current_query_index]
        self.current_query_index = (self.current_query_index + 1) % len(self.my_queries)
        return query

    @task
    def test_agent(self):
        query = self.get_next_query()
        # ... 后续逻辑与之前类似,使用query变量 ...

添加启动前准备钩子

@events.test_start.add_listener
def on_test_start(environment, **kwargs):
    """在测试开始时运行,用于初始化全局资源(仅在Master节点)"""
    if isinstance(environment.runner, MasterRunner):
        print("性能测试即将开始,初始化完成。")
        # 这里可以连接数据库、初始化全局变量等

5. 执行测试与结果分析

脚本准备好后,就可以开始压测了。

5.1 启动Locust

在脚本所在目录,打开终端。

方式一:使用Web UI(推荐用于调试和观察)

locust -f ai_agent_load_test.py --host=http://your-ai-agent-service.com

然后浏览器打开 http://localhost:8089

  • Number of users :要模拟的总用户数。
  • Spawn rate :每秒启动多少个用户。
  • Host :目标服务器的地址。

在Web UI中点击“Start swarming”即可开始。UI界面会实时展示RPS、响应时间、失败率等,非常直观。

方式二:无头模式(用于自动化或CI/CD)

locust -f ai_agent_load_test.py --host=http://your-ai-agent-service.com --headless -u 100 -r 10 --run-time 5m --html report.html
  • --headless :无头模式,不启动Web UI。
  • -u 100 :模拟100个用户。
  • -r 10 :每秒启动10个用户。
  • --run-time 5m :运行5分钟。
  • --html report.html :生成HTML格式的测试报告。

5.2 关键指标解读与瓶颈分析

测试运行后,我们需要关注Locust报告和服务器监控。

Locust报告核心指标

  1. RPS (Requests per second) :注意区分是“轮询请求”的RPS还是“完整业务请求”的RPS。在我们的脚本中,一个完整的 test_complex_analysis 任务包含了1次POST和最多10次GET。Locust默认统计每个HTTP请求。为了得到“业务吞吐量”,我们需要关注自定义的“任务执行成功数/时间”。可以在脚本中通过自定义事件来统计,或者更简单地,在分析时进行换算。
  2. 响应时间(Avg, P95, P99) :这是最重要的指标之一。P95响应时间意味着95%的请求在这个时间内完成。对于AI智能体,如果P95时间超过5-10秒,用户体验就会很差。需要重点关注 /v1/tasks/{id} 这个轮询接口的P95时间,它直接反映了智能体处理链路的耗时。
  3. 失败率(Fails) :理想情况下应为0%。任何非零的失败率都需要排查原因,是网络超时、服务内部错误,还是我们脚本中的断言条件太严格?

结合服务器监控定位瓶颈

  • CPU/内存使用率飙升 :如果并发上升时,服务器CPU持续高于80%或内存使用率不断增长,可能是业务逻辑代码或框架本身存在效率问题,或者需要水平扩容。
  • 数据库连接数或慢查询 :如果智能体频繁查询数据库,这很可能成为瓶颈。监控数据库的活跃连接数和慢查询日志。
  • 大模型服务延迟 :这是AI智能体特有的瓶颈。监控大模型API的调用延迟和令牌(Token)消耗速度。如果并发请求导致大模型API排队或限流,响应时间就会急剧上升。 解决方案 可能是引入缓存(对常见问题缓存答案)、优化提示词(Prompt)以减少Token消耗、或使用更快的模型。
  • 网络延迟 :如果智能体需要调用多个外部服务(如不同的工具API),网络往返时间(RTT)会叠加。考虑服务部署在同一内网,或优化调用链,将可并行化的工具调用改为并行。

实操心得 :在一次测试中,我发现当并发用户达到150时,P95响应时间从3秒陡增到25秒。查看服务器监控,CPU和内存都很平稳。最后排查发现,是大模型服务提供方的API有每秒请求数(QPS)限制,我们的智能体触发了限流,导致大量请求在队列中等待。解决方案是我们在智能体服务层增加了请求队列和限流策略,并考虑引入模型缓存层。

6. 高级策略与常见问题排查

6.1 模拟更真实的用户行为

基础的脚本模拟了用户轮流执行任务。为了更真实,我们可以:

  • 思考时间随机化 wait_time = between(0.5, 5) 让用户“思考”时间更随机。
  • 任务序列化 :模拟用户连续进行多轮对话。
class ConversationalUser(HttpUser):
    def on_start(self):
        self.conversation_history = []

    @task
    def multi_turn_chat(self):
        # 第一轮
        response1 = self.send_message("你好")
        self.conversation_history.append(response1)
        # 基于历史进行第二轮
        time.sleep(2)
        follow_up = f"根据你刚才说的,那么具体数据是多少?上下文:{self.conversation_history[-1]}"
        self.send_message(follow_up)
  • 参数化与数据驱动 :从外部文件或数据库读取成千上万条不同的用户输入,避免因重复查询导致服务端缓存影响测试结果。

6.2 分布式压测

当单台测试机无法模拟足够高的并发时,就需要分布式压测。

  1. 启动一台Master: locust -f ai_agent_load_test.py --master
  2. 在多台Worker机器上启动Worker: locust -f ai_agent_load_test.py --worker --master-host=<MASTER_IP> 所有Worker的压力数据会汇总到Master的Web UI上。

6.3 常见问题与排查技巧

问题1:Locust报告大量“Connection refused”或“Timeout”错误。

  • 排查 :首先检查 --host 参数是否正确,网络是否通畅。然后检查目标服务是否已经崩溃或过载。使用 netstat ss 命令查看服务端口监听状态,用 top 命令查看服务进程资源使用情况。
  • 解决 :确保服务健康,逐步增加并发数,观察服务承受能力。

问题2:任务轮询逻辑导致测试时间过长,虚拟用户堆积。

  • 排查 test_complex_analysis 任务中,如果每个任务都轮询10次,每次等2秒,那么一个用户执行一次任务就可能占用20多秒。在高并发下,Locust会快速孵化出大量用户,但每个用户都被长时间的任务阻塞着,导致实际并发请求数并不高,但内存中积累了大量的User实例。
  • 解决 :调整 wait_time 和轮询策略。或者,使用两个不同的User类来模拟不同行为: FastUser (只做简单查询)和 SlowUser (做复杂分析),并控制它们的比例。

问题3:如何准确统计“业务成功率”而非“HTTP请求成功率”?

  • 方案 :利用Locust的自定义事件和全局字典。在任务成功完成时(比如收到 status: completed ),触发一个自定义事件,并记录到全局统计中。
from locust import events
from locust.runners import MasterRunner

business_success_count = 0
business_failure_count = 0

@events.request.add_listener
def on_business_success(request_type, name, response_time, response_length, **kwargs):
    # 这里需要根据实际业务逻辑来判断,示例中简单通过一个标志位
    # 更佳实践是在Task中设置一个上下文变量
    pass

# 在User类中,任务真正成功时
# business_success_count += 1

更简单直接的方法是,在测试结束后,分析Locust生成的CSV日志文件,根据URL和响应内容进行过滤和统计。

问题4:测试过程中,如何动态调整负载?

  • 方案 :Locust的Web UI提供了动态调整并发用户数和孵化速率的功能。这是其一大优势。你可以在测试运行时,根据服务器监控指标,实时增加或减少用户数,观察系统的弹性。

给AI智能体做性能测试,远不止是发个请求那么简单。它要求测试脚本能精准模拟出智能体复杂的、有状态的、异步的工作流。Locust凭借其代码化的灵活性,在这方面优势明显。通过这次实践,我们不仅得到了服务的性能基线,更重要的是发现了隐藏在大模型调用链路上的瓶颈。性能测试不是一次性的任务,而应该随着智能体功能的迭代持续进行。下次当你给智能体添加一个新工具(Tool)或者更换底层模型时,别忘了再用Locust“拷问”它一番,确保用户体验始终在线。

更多推荐