1. 项目概述:为什么是Locust?

如果你正在寻找一个能让你用Python代码来定义用户行为的压测工具,并且希望它足够轻量、可扩展,还能轻松玩转分布式,那Locust几乎就是为你量身定做的。它不是另一个需要你反复点击配置的图形化工具,而是一个将压测逻辑完全代码化的框架。这意味着,你的测试场景就是一段Python脚本,版本可控、逻辑清晰、复用性强。对于开发者和测试工程师来说,这极大地提升了编写和维护复杂压测场景的效率。

Locust的核心思想是“用代码模拟用户”。每个虚拟用户(我们称之为“蝗虫”)都是一个独立的Python协程,它们按照你编写的 TaskSet (任务集)中定义的行为模式去“攻击”你的系统。从简单的HTTP接口到复杂的WebSocket、gRPC,只要你能用Python发起请求,Locust就能帮你模拟压力。更关键的是,它的分布式架构设计得非常简洁:一个 master 节点负责协调和收集数据,多个 worker 节点负责真正地产生负载,通过简单的命令行参数就能拉起一个庞大的压测集群。相比于一些传统工具,它避免了中心化控制器的性能瓶颈,让压测能力可以随着 worker 节点的增加近乎线性地扩展。

2. 核心概念与快速入门

2.1 环境搭建与第一个脚本

上手Locust的第一步是安装。由于Locust完全基于Python,所以通过pip安装是最直接的方式。我强烈建议你使用虚拟环境(如 venv conda )来管理依赖,避免污染全局环境。

# 创建并激活虚拟环境(以venv为例)
python -m venv locust_env
source locust_env/bin/activate  # Linux/macOS
# locust_env\Scripts\activate  # Windows

# 安装Locust
pip install locust

安装完成后,验证一下: locust --version 。接下来,我们编写第一个也是最简单的压测脚本,命名为 locustfile.py 。这个文件名是Locust默认寻找的入口文件。

from locust import HttpUser, task, between

class QuickstartUser(HttpUser):
    # between用于设置用户执行每个任务后等待的随机时间范围(秒)
    wait_time = between(1, 5)

    # 用@task装饰器标记这是一个用户任务,权重值越大,被执行的概率越高
    @task(3) # 此任务权重为3
    def view_items(self):
        # 模拟用户浏览商品列表
        self.client.get("/api/items")
        # self.client是HttpUser内置的HttpSession实例,用法类似requests

    @task(1) # 此任务权重为1
    def view_item_detail(self):
        # 模拟用户查看某个商品详情,这里以商品ID 1为例
        self.client.get("/api/items/1")

    # on_start方法会在每个虚拟用户开始运行时执行一次,常用于登录等初始化操作
    def on_start(self):
        # 这里可以模拟登录,获取token等
        # response = self.client.post("/login", json={"username":"foo", "password":"bar"})
        # self.token = response.json().get("token")
        pass

这个脚本定义了一类用户 QuickstartUser ,他们有两种行为:以3倍的概率浏览商品列表,和以1倍的概率查看商品详情。每次操作后,会等待1到5秒的随机时间,模拟用户思考或阅读的间隔。

注意: locustfile.py 必须位于你启动Locust命令的当前目录,或者你可以通过 -f 参数指定脚本路径。 HttpUser 是Locust为HTTP/HTTPS测试提供的便捷类,如果你的被测系统是其他协议(如WebSocket),需要继承基础的 User 类并自行实现客户端。

2.2 核心对象深度解析:User, TaskSet与Client

理解Locust的三大核心对象,是编写高效压测脚本的关键。

1. User类:虚拟用户的蓝图 你编写的每一个继承自 User (或 HttpUser )的类,都定义了一类虚拟用户的行为模式。Locust会为每一类用户生成指定数量的实例(协程),每个实例独立运行,互不干扰。 wait_time 属性决定了用户执行完一个任务后如何等待,除了 between ,还有 constant (固定等待)和 constant_pacing (固定节奏,确保任务执行间隔至少为指定值)等策略。

2. TaskSet类:任务编排的容器 TaskSet 用于将多个任务组织在一起,形成更复杂、有层次的用户行为流。例如,一个电商用户的行为可以拆解为“浏览任务集”和“下单任务集”。在 TaskSet 内部,你可以嵌套其他 TaskSet ,实现行为的跳转。

from locust import HttpUser, task, TaskSet, between

class BrowseProducts(TaskSet):
    @task(2)
    def list_products(self):
        self.client.get("/products")

    @task(1)
    def get_product_detail(self):
        self.client.get("/products/123")

    @task(1)
    def stop_browsing(self):
        # 通过self.interrupt()可以中断当前TaskSet,返回到父级
        self.interrupt()

class PlaceOrder(TaskSet):
    @task
    def submit_order(self):
        self.client.post("/orders", json={"item": "book"})

    @task
    def on_stop(self):
        # 当TaskSet停止时,会执行on_stop方法
        print("Order task finished.")

class WebsiteUser(HttpUser):
    wait_time = between(2, 5)
    # tasks是一个列表,可以包含TaskSet类或普通任务函数,以及它们的权重
    tasks = [BrowseProducts, PlaceOrder] # 用户有50%概率进入BrowseProducts,50%进入PlaceOrder

    # 也可以使用字典形式指定权重
    # tasks = {BrowseProducts: 2, PlaceOrder: 1} # 2/3概率进入BrowseProducts,1/3进入PlaceOrder

3. Client:发起请求的引擎 对于 HttpUser self.client 是一个 HttpSession 对象,它封装了HTTP请求,并自动记录每次请求的响应时间、状态码等数据,汇总到Locust的统计报告中。它的API与流行的 requests 库高度相似,支持GET、POST、PUT、DELETE等各种方法,以及headers、json、auth等参数,学习成本极低。关键在于,所有由 self.client 发起的请求都会被Locust监控。

3. 脚本编写进阶与实战技巧

3.1 模拟真实用户行为:参数化、关联与思考时间

一个有效的压力测试必须逼近真实用户,而不是简单的重复请求。这涉及到几个关键技巧。

参数化数据: 让每次请求都不一样。你可以从文件中读取,或者动态生成。

import csv
from locust import HttpUser, task, between
import random

class ParameterizedUser(HttpUser):
    wait_time = between(1, 3)

    def on_start(self):
        # 从CSV文件读取测试数据,例如用户名和商品ID
        with open('user_credentials.csv', 'r') as f:
            reader = csv.DictReader(f)
            self.users = list(reader)
        self.current_user = random.choice(self.users)

    @task
    def login_and_view(self):
        # 使用参数化数据
        uid = self.current_user['id']
        # 模拟登录(带参数)
        with self.client.post("/login", json={"username": self.current_user['username']}, catch_response=True) as response:
            if response.status_code == 200:
                self.token = response.json()['token']
                # 将token设置到后续请求的header中
                self.client.headers = {'Authorization': f'Bearer {self.token}'}
            else:
                response.failure(f"Login failed for {self.current_user['username']}")
        # 查看该用户相关的订单
        self.client.get(f"/orders?user_id={uid}")

请求关联: 处理一个请求的响应结果是下一个请求的输入,最常见的就是登录后获取token。如上例所示,使用 catch_response=True 上下文管理器可以捕获响应并进行验证、提取数据。

思考时间与步调时间: wait_time 不仅仅是随机等待。 constant_pacing 模式非常有用,它能确保单个用户执行一个任务循环(比如“登录-浏览-下单”)的总时间至少是你设定的值,这对于模拟有固定操作节奏的业务场景(如抢购)至关重要,可以更精确地控制每秒事务数(TPS)的上限。

3.2 测试非HTTP协议:WebSocket与自定义Client

Locust的强大之处在于其灵活性。测试WebSocket服务时,你需要继承基础的 User 类,并使用如 websocket-client 这样的库来实现客户端逻辑。

import json
import time
from locust import User, task, events, constant
import websocket
from threading import Thread

class WebSocketClient:
    def __init__(self, host):
        self.ws = websocket.WebSocket()
        self.host = host

    def connect(self):
        start_time = time.time()
        try:
            self.ws.connect(f"ws://{self.host}/chat")
            events.request_success.fire(request_type="WS Connect", name="Connect", response_time=int((time.time()-start_time)*1000), response_length=0)
        except Exception as e:
            events.request_failure.fire(request_type="WS Connect", name="Connect", response_time=int((time.time()-start_time)*1000), exception=e)

    def send(self, message):
        start_time = time.time()
        try:
            self.ws.send(json.dumps(message))
            # 这里简单假设发送成功,实际可能需要接收回复来确认
            events.request_success.fire(request_type="WS Send", name="Send Message", response_time=int((time.time()-start_time)*1000), response_length=len(str(message)))
        except Exception as e:
            events.request_failure.fire(request_type="WS Send", name="Send Message", response_time=int((time.time()-start_time)*1000), exception=e)

    def close(self):
        self.ws.close()

class WebSocketUser(User):
    abstract = True # 这是一个抽象类,Locust不会直接实例化它
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.client = WebSocketClient(self.host)

    def on_start(self):
        self.client.connect()

    def on_stop(self):
        self.client.close()

class ChatUser(WebSocketUser):
    host = "localhost:8080"
    wait_time = constant(1)

    @task
    def send_chat(self):
        self.client.send({"type": "chat", "content": "Hello from Locust!"})

这里的关键是:

  1. 封装一个自定义的 WebSocketClient ,负责连接、发送、接收。
  2. 在自定义的 WebSocketUser 中管理这个客户端的生命周期( on_start , on_stop )。
  3. 使用 events.request_success.fire events.request_failure.fire 手动向Locust报告成功或失败,这样你的WebSocket请求才会出现在Locust的统计报告中。

实操心得:测试非标准协议时,手动触发事件报告是核心。务必准确计算 response_time (单位毫秒),这决定了统计报告中响应时间数据的准确性。对于需要等待服务器响应的操作,建议在收到响应后再触发 request_success

3.3 利用事件钩子与自定义统计

Locust的事件钩子( events )系统允许你在测试生命周期的各个阶段注入自定义逻辑,这为高级监控和定制化报告打开了大门。

初始化事件: 在测试开始前准备测试数据或连接外部监控系统。

from locust import events
import pandas as pd

@events.init.add_listener
def on_locust_init(environment, **kwargs):
    print("Locust初始化...")
    # 例如,加载一个大的测试数据文件到环境变量中,所有worker共享
    if not hasattr(environment, 'test_data'):
        environment.test_data = pd.read_csv('large_test_data.csv')

请求事件: 除了上面手动触发,你也可以监听所有请求,进行额外的处理或过滤。

@events.request.add_listener
def on_request(request_type, name, response_time, response_length, exception, context, **kwargs):
    if exception:
        print(f"请求失败: {name}, 异常: {exception}")
    # 可以在这里将数据发送到外部时序数据库,如InfluxDB

自定义统计: 你可能想追踪一些业务指标,如下单成功率、特定业务接口的TPS。

from locust import stats
import time

class BizStats:
    order_success = 0
    order_failure = 0

    @staticmethod
    def record_order(success):
        if success:
            BizStats.order_success += 1
        else:
            BizStats.order_failure += 1

# 在你的任务中调用
@task
def place_order(self):
    start_time = time.time()
    try:
        # ... 下单逻辑
        BizStats.record_order(True)
    except:
        BizStats.record_order(False)

# 通过事件在测试停止时打印自定义统计
@events.quitting.add_listener
def on_quitting(environment, **kw):
    print(f"\n自定义业务统计:")
    print(f"  下单成功: {BizStats.order_success}")
    print(f"  下单失败: {BizStats.order_failure}")
    if BizStats.order_success + BizStats.order_failure > 0:
        print(f"  下单成功率: {BizStats.order_success/(BizStats.order_success+ BizStats.order_failure)*100:.2f}%")

4. 分布式压测集群搭建与运维

当单机无法产生足够压力,或者你想从不同网络区域发起测试时,就需要使用分布式模式。

4.1 架构与启动命令

Locust采用主从(Master-Worker)架构。

  • Master节点 :负责分发测试任务、协调Worker、收集并汇总所有Worker的统计数据、托管Web UI。它本身不模拟任何用户。
  • Worker节点 :负责执行 locustfile.py 中的测试逻辑,生成真实的虚拟用户负载,并将统计数据实时上报给Master。

启动命令如下:

# 在Master机器上启动
locust -f locustfile.py --master --host=http://your-target-system.com

# 在每台Worker机器上启动
locust -f locustfile.py --worker --master-host=<master-ip-address>

--master-host 参数告诉Worker Master节点的IP地址。确保所有节点(Master和Workers)都能访问到 locustfile.py 和其依赖(可以通过版本控制或共享存储同步),并且都能访问到被测系统( --host 指定的目标)。

4.2 网络配置与常见踩坑点

分布式部署中最常见的问题是网络连通性。以下是一个检查清单:

  1. Master与Worker双向连通 :在Worker机器上,使用 telnet <master-ip> 5557 (或 nc -zv <master-ip> 5557 )检查是否能连接到Master的通信端口(默认5557)。同样,Master也需要能访问Worker的通信端口(默认5558)。
  2. 防火墙设置 :确保所有相关机器上的防火墙放行了5557-5558端口的TCP流量。在云服务器上,还需要检查安全组规则。
  3. 目标系统可达 :所有Worker必须能正常访问被压测的系统( --host 参数)。
  4. 时钟同步 :虽然不强制,但建议所有节点使用NTP服务进行时间同步,这有助于日志时间戳对齐,排查问题时更清晰。

踩坑实录:有一次在云环境部署,Worker一直显示“Connected”,但Master的Web UI上用户数始终为0。最后发现是云平台的安全组只配置了入站规则,忘了配置Worker到Master的回包规则(出站规则默认全通,但入站需明确)。另一个常见坑是Python环境或第三方库版本不一致,导致Worker加载脚本失败。建议使用Docker容器来封装运行环境,能完美解决一致性问题。

4.3 使用Docker容器化部署

使用Docker是管理分布式Locust集群的最佳实践,它能保证环境一致性,简化部署。

Dockerfile示例:

FROM python:3.9-slim
RUN pip install --no-cache-dir locust
WORKDIR /mnt/locust
COPY locustfile.py .
EXPOSE 8089 5557 5558

使用Docker Compose编排集群:

version: '3'
services:
  master:
    build: .
    ports:
      - "8089:8089" # Web UI端口
      - "5557:5557" # Master通信端口
    command: locust -f /mnt/locust/locustfile.py --master --host=http://host.docker.internal
    networks:
      - locust-net

  worker:
    build: .
    depends_on:
      - master
    command: locust -f /mnt/locust/locustfile.py --worker --master-host=master
    deploy:
      replicas: 4 # 启动4个worker副本
    networks:
      - locust-net

networks:
  locust-net:

运行 docker-compose up --scale worker=4 即可一键启动一个1 Master + 4 Worker的集群。注意,如果被测系统也在本地,Master命令中的 --host 需要使用 host.docker.internal 来指向宿主机。

5. 结果分析与性能瓶颈定位

压测的最终目的是获取洞察,而不仅仅是看一个“最大并发数”。Locust的Web UI提供了实时图表,但深入分析需要关注更多维度。

5.1 核心监控指标解读

  • 吞吐量(RPS, Requests Per Second) :系统每秒处理的请求数。这是衡量系统处理能力的核心指标。在负载增加时,观察RPS的曲线。理想情况下,它应随着并发用户数线性增长,直到达到系统瓶颈。如果用户数增加而RPS持平甚至下降,说明系统已过载。
  • 响应时间(Response Time) :重点关注 平均响应时间 中位数(Median) 95/99分位数(95th/99th percentile) 。平均响应时间容易受极端值影响,中位数更能代表“普通用户”的体验。95/99分位数则告诉你最慢的那5%或1%的请求有多慢,这对于评估服务稳定性至关重要。例如,API的SLA要求99%的请求在200ms内,那么你就必须关注99分位响应时间。
  • 错误率(Failures) :任何非2xx(以及可选的3xx)的HTTP状态码或未捕获的异常都会被记为失败。压测过程中错误率应始终保持在极低水平(如<0.1%)。错误率突然飙升是系统出现问题的明确信号。
  • 并发用户数(Number of Users) :Locust中展示的是 当前活跃的虚拟用户数 。你需要区分“总用户数”和“并发用户数”。在Locust里,你设置的是“峰值并发用户数”。

5.2 定位瓶颈的实战方法

当性能指标出现恶化时,需要系统性地定位瓶颈。

  1. 观察负载与指标的关系 :在Locust Web UI中,逐步增加用户数,同时观察RPS和响应时间曲线。如果响应时间开始陡增而RPS增长放缓,说明系统正在接近瓶颈。记录下这个拐点对应的用户数和RPS。
  2. 分析错误类型 :点击Web UI上的“Failures”标签,查看具体的失败请求和异常信息。是连接超时(Timeout)?还是5xx服务器内部错误?或者是4xx客户端错误(可能参数有问题)?不同的错误指向不同的排查方向(网络、服务端代码、测试脚本)。
  3. 结合系统监控 :压测工具只能看到外部表现,真正的瓶颈需要结合服务器监控来看。在压测过程中,实时监控被测服务器的:
    • CPU使用率 :是否有一核或多核持续100%?
    • 内存使用率 :是否在增长,有无内存泄漏迹象?
    • 磁盘I/O :特别是等待时间(await),是否过高?
    • 网络带宽 :是否被打满?
    • 应用层面 :应用服务器(如Gunicorn、uWSGI)的连接队列是否堆积?数据库连接池是否耗尽?慢查询日志是否激增?

例如,你观察到RPS上不去,响应时间变长,同时服务器CPU使用率却不高(比如只有30%)。这很可能意味着瓶颈不在计算,而在 I/O等待 ——可能是数据库查询慢、外部API调用延迟高、或者是磁盘读写慢。此时就应该去查数据库的监控和慢日志。

5.3 生成与解析离线报告

Web UI适合实时监控,但做最终汇报和留存需要离线报告。Locust支持生成HTML报告。

# 以无头模式运行测试,并指定运行时间和用户数,然后生成报告
locust -f locustfile.py --headless --users 100 --spawn-rate 10 --run-time 1m --host=http://your-target-system.com --html report.html

--headless 表示不启动Web UI。 --users 是峰值用户数, --spawn-rate 是每秒启动的用户数, --run-time 是总运行时间。运行结束后会生成 report.html

对于更自动化的CI/CD流程,你可能需要机器可读的格式(如JSON)。Locust默认不直接提供,但可以通过事件钩子或使用 locust-plugins 等第三方库来收集数据并输出为JSON或CSV,方便集成到你的监控分析平台中。

6. 常见问题排查与性能调优心得

6.1 Locust自身性能与资源优化

有时候,瓶颈可能不在被测系统,而在Locust压测机本身。

  • 单机虚拟用户上限 :一个Locust Worker进程(一个Python进程)能模拟的用户数受限于机器CPU和内存,以及Python GIL的限制。通常,一个CPU核心可以稳定模拟数百到一千个轻量级HTTP用户。如果用户行为复杂(思考时间长、计算多),这个数字会下降。 监控压测机的CPU和内存 ,如果接近饱和,就需要增加Worker节点或提升机器配置。
  • 优化脚本性能
    • 避免在任务循环中执行重操作 :如读取大文件、复杂的数值计算。这些操作应在 on_start 或初始化时完成。
    • 谨慎使用 time.sleep :在Locust协程中使用 time.sleep 会阻塞整个事件循环。对于固定等待,务必使用 wait_time 属性。如果需要在任务中实现复杂等待逻辑,可以使用 gevent.sleep (Locust基于gevent)。
    • 连接复用 HttpUser client 默认会保持HTTP连接( requests.Session ),这是好的。对于自定义客户端(如WebSocket),也要考虑连接池和复用。
  • “Worker已连接但用户数为0” :几乎都是脚本加载失败。去Worker节点的日志中查看错误信息。常见原因:Python语法错误、导入的模块不存在、 locustfile.py 中定义的 User 类名不对。

6.2 被测系统性能问题排查指引

当Locust报告错误率上升或响应时间变慢时,你可以按照以下清单进行排查:

现象 可能原因 排查方向
连接超时/拒绝连接 网络问题、服务器连接池耗尽、端口未监听 检查网络连通性(telnet)、服务器应用日志(如“Too many open files”)、应用进程状态。
HTTP 5xx错误 服务端应用内部错误、依赖服务故障、资源不足(CPU、内存、数据库连接) 查看应用错误日志、监控服务器资源使用率、检查数据库/缓存等下游服务状态。
HTTP 4xx错误 客户端请求错误(参数无效、身份验证失败) 检查Locust脚本中的请求参数、Headers(如Token是否过期)、URL是否正确。
响应时间慢,但CPU低 I/O等待、数据库慢查询、外部API调用慢、锁竞争 检查数据库监控(慢查询日志、活跃连接数)、外部服务响应时间、应用线程堆栈(是否存在死锁或长时间等待)。
RPS达到平台期不再增长 系统达到吞吐量瓶颈、配置限制(如Web服务器最大连接数) 综合监控所有系统资源(CPU、内存、磁盘I/O、网络带宽、数据库TPS),找到最先达到100%的资源。检查应用和中间件的配置参数(如Tomcat的maxThreads, MySQL的max_connections)。

6.3 我的实战调优笔记

  1. 预热很重要 :对于使用JVM(如Java)或存在JIT编译的系统,在开始记录正式数据前,先施加一段时间的低负载(例如,用50个用户跑1分钟),让系统“热”起来,这样得到的性能数据更稳定、更真实。
  2. 阶梯式增压 :不要一下子把用户数拉到最大值。使用 --spawn-rate 参数,让用户数缓慢、阶梯式地增加(例如,每秒增加10个用户)。这样你可以更清晰地观察到系统性能随负载变化的曲线,精准定位性能拐点。
  3. 关注中间件和数据库 :很多时候,应用服务器本身还没到极限,数据库先扛不住了。压测时一定要同步监控数据库的CPU、连接数、慢查询和锁等待情况。一个没有索引的查询,在低并发时没问题,高并发下就是灾难。
  4. 日志级别调整 :压测时,将应用和中间件的日志级别调整到WARN或ERROR,避免大量的INFO日志刷盘成为I/O瓶颈,影响性能表现。
  5. 分布式压测的数据一致性 :确保所有Worker节点的时间大致同步,否则汇总的统计图表会出现锯齿。对于需要全局唯一ID或顺序的测试数据,可以在Master初始化时生成一个池子,或者使用雪花算法等分布式ID生成方案,避免Worker之间数据冲突。

最后,记住压测的目的是发现和解决问题,而不是单纯追求一个数字。每一次压测,都应该有明确的目标(例如,验证新系统能否支撑预期流量,或找到当前系统的容量上限),并根据测试结果,形成“负载表现-资源消耗-瓶颈分析-优化建议”的完整闭环。Locust是你手中的利器,而清晰的测试策略和严谨的分析思维,才是驾驭这把利器的关键。

更多推荐