Python分布式RPA任务测试:pytest与Celery集成实战指南
1. 项目概述:为什么需要分布式RPA任务测试?
如果你正在用Python搞RPA(机器人流程自动化),尤其是处理那些需要跨系统、长时间运行或者数据量巨大的流程,那你肯定遇到过测试的麻烦。单个脚本在本地跑得挺好,一上生产环境,面对并发用户、海量数据或者复杂的依赖服务,各种稀奇古怪的问题就冒出来了。更头疼的是,RPA流程往往不是孤立的,它可能涉及调用外部API、操作数据库、处理文件队列,这些环节在测试时很难完全模拟真实场景的负载和并发。
这就是为什么我们需要把pytest和Celery集成起来。pytest是Python里最灵活、最强大的测试框架,没有之一,它的夹具(fixture)系统和插件生态能让测试代码写得既简洁又健壮。而Celery,则是Python领域处理分布式任务和异步消息队列的“老炮儿”,它能把任务分发到多个工作节点(Worker)上并行执行,完美模拟生产环境的分布式特性。把这两者结合,你就能搭建一个接近真实的测试沙盒:用pytest组织测试用例和断言逻辑,用Celery驱动任务在分布式环境中执行,从而在代码上线前,就提前发现那些只有在高并发、分布式环境下才会暴露的坑。
我自己的团队在为一个电商客户做订单自动处理RPA时,就深刻体会到了这种集成测试的价值。本地测试时,脚本处理100个订单文件毫无压力,但一到生产环境,同时有上千个订单涌入,任务队列堵塞、数据库连接池耗尽、文件锁冲突等问题全来了。后来我们就是用这套pytest-celery的集成方案,在测试环境里模拟了同样的并发压力,提前修复了十几个隐蔽的Bug。所以,无论你是开发一个复杂的财务对账机器人,还是一个爬取数据的自动化流程,这套方法都能让你的RPA项目更稳健。
2. 环境准备与核心依赖安装
工欲善其事,必先利其器。搭建这个测试环境,核心就是装对版本、配好服务。别小看这一步,版本冲突和配置错误是新手最常见的拦路虎。
2.1 Python环境与项目结构搭建
首先,我强烈建议使用虚拟环境(Virtual Environment)来隔离项目依赖。这能避免你的系统Python环境被污染,也方便不同项目使用不同版本的库。如果你用的是Python 3.3以上版本,可以直接用内置的 venv 模块。
# 创建项目目录并进入
mkdir rpa-distributed-test && cd rpa-distributed-test
# 创建虚拟环境
python -m venv venv
# 激活虚拟环境(Linux/macOS)
source venv/bin/activate
# 激活虚拟环境(Windows)
venv\Scripts\activate
激活后,你的命令行提示符前应该会出现 (venv) 字样。接下来,初始化一个标准的Python项目结构。虽然简单,但清晰的目录能让后续的测试代码和配置管理更轻松。
rpa-distributed-test/
├── src/ # 存放你的RPA核心业务代码
│ └── rpa_bot/ # 例如,你的订单处理机器人模块
├── tests/ # 存放所有测试代码
│ ├── conftest.py # pytest的共享夹具配置文件
│ ├── test_tasks.py # 针对Celery任务的单元/集成测试
│ └── integration/ # 集成测试目录
├── celery_config.py # Celery的配置文件
├── requirements.txt # 项目依赖列表
└── pytest.ini # pytest配置文件
2.2 安装核心Python库
现在来安装主角们。版本兼容性很重要,以下是我经过多个项目验证的稳定组合。
# 在激活的虚拟环境中,使用pip安装
pip install pytest==7.4.0
pip install celery==5.3.0
# 安装pytest-celery插件,它是集成的桥梁
pip install pytest-celery==1.0.0
# 安装一个消息代理(Broker),这里以Redis为例,因为它轻量且性能好
pip install redis==4.6.0
# 可选但推荐:用于测试时生成模拟数据
pip install faker==19.0.0
把依赖固化到 requirements.txt 是个好习惯:
pytest==7.4.0
celery==5.3.0
pytest-celery==1.0.0
redis==4.6.0
faker==19.0.0
注意 :
pytest-celery这个插件非常关键,它提供了一系列预定义的夹具(如celery_worker、celery_broker),能让我们在测试中轻松启动和停止Celery的工作节点与消息代理,无需手动编写复杂的启停逻辑。这是实现自动化测试的核心。
2.3 消息代理(Broker)的安装与运行
Celery需要一个消息中间件(Broker)来传递任务。虽然RabbitMQ是官方推荐且功能最全的,但对于测试环境,Redis以其安装简单、内存存储快速的特点,是更优的选择。
对于macOS用户(使用Homebrew):
brew install redis
brew services start redis
对于Ubuntu/Debian用户:
sudo apt update
sudo apt install redis-server
sudo systemctl start redis-server
sudo systemctl enable redis-server
对于Windows用户: Windows官方不提供Redis的稳定原生版本。建议使用以下任一方法:
- 使用WSL2(Windows Subsystem for Linux) :在WSL2中安装一个Ubuntu发行版,然后按照上述Linux命令安装Redis。这是最接近生产环境的方式。
- 使用Memurai :一个兼容Redis协议的Windows替代品(有开发版免费)。
- Docker :安装Docker Desktop后,一条命令运行Redis容器:
docker run -d -p 6379:6379 redis:alpine。这是我最推荐的方式,干净且隔离。
安装完成后,打开终端,运行 redis-cli ping ,如果返回 PONG ,说明Redis服务已经正常运行。
3. Celery应用与pytest夹具基础配置
环境搭好了,我们来让Celery和pytest“认识”一下。这一步的核心是创建Celery应用实例,并用pytest的夹具机制将其融入测试生命周期。
3.1 创建Celery应用与任务定义
首先,在项目根目录创建 celery_config.py ,这里集中管理Celery的配置。将配置与代码分离是良好的实践。
# celery_config.py
import os
# 使用Redis作为Broker和结果后端(Result Backend)
broker_url = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
result_backend = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
# 定义任务路由、序列化方式等
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Shanghai'
enable_utc = True
# 非常重要:为测试环境设置任务始终立即执行(Eager模式),简化单元测试
# 在实际集成测试中,我们会关闭这个模式,使用真正的Worker
task_always_eager = os.getenv('CELERY_TASK_ALWAYS_EAGER', 'False').lower() == 'true'
接着,创建Celery应用。我习惯在 src 目录下创建一个 celery_app.py 文件。
# src/celery_app.py
from celery import Celery
import sys
import os
# 将项目根目录添加到Python路径,确保能导入自己的模块
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 创建Celery应用实例,命名为'rpa_tasks'
app = Celery('rpa_tasks')
# 从配置文件加载配置
app.config_from_object('celery_config')
# 自动从注册的模块中发现任务(tasks)
# 假设我们的RPA任务写在src/rpa_bot/tasks.py中
app.autodiscover_tasks(['src.rpa_bot'])
现在,来定义一个真实的RPA任务。假设我们有一个处理CSV订单文件的机器人。
# src/rpa_bot/tasks.py
import pandas as pd
import logging
from celery import shared_task
from time import sleep
from .utils import validate_order, update_inventory # 假设有一些业务工具函数
logger = logging.getLogger(__name__)
@shared_task(bind=True, max_retries=3) # bind=True允许访问任务实例,max_retries设置重试次数
def process_order_file(self, file_path):
"""
分布式处理订单文件的Celery任务。
模拟一个耗时的RPA流程:读取CSV、验证数据、更新库存。
"""
logger.info(f"Worker开始处理文件: {file_path}")
try:
# 1. 模拟读取文件(实际项目中可能是从共享存储如S3、NFS读取)
df = pd.read_csv(file_path)
logger.info(f"成功读取文件,共{len(df)}条订单。")
processed_count = 0
errors = []
# 2. 遍历处理每一行订单
for index, row in df.iterrows():
order_id = row['order_id']
try:
# 模拟一个可能失败的业务验证
if not validate_order(row.to_dict()):
raise ValueError(f"订单 {order_id} 数据验证失败")
# 模拟一个耗时的库存更新操作
sleep(0.1) # 模拟网络或数据库IO
update_inventory(row['product_id'], row['quantity'])
processed_count += 1
except Exception as e:
error_msg = f"处理订单 {order_id} 时出错: {str(e)}"
errors.append(error_msg)
logger.error(error_msg)
# 对于关键错误,可以选择让整个任务失败,这里我们记录后继续
# 3. 汇总结果
result = {
'file_path': file_path,
'total_orders': len(df),
'processed': processed_count,
'failed': len(errors),
'errors': errors
}
logger.info(f"文件处理完成: {result}")
return result
except FileNotFoundError:
logger.error(f"文件不存在: {file_path}")
# 使用self.retry进行重试
raise self.retry(countdown=60, exc=FileNotFoundError(f"文件未就绪: {file_path}"))
except pd.errors.EmptyDataError:
logger.error(f"文件为空或格式错误: {file_path}")
return {'error': 'Empty or invalid CSV', 'file_path': file_path}
except Exception as e:
# 捕获其他未预见的异常,记录并重试
logger.exception(f"处理文件时发生未知错误: {file_path}")
raise self.retry(countdown=120, max_retries=2)
这个任务定义包含了RPA测试中常见的几个要素: 外部依赖(文件) 、 业务逻辑验证 、 模拟耗时操作 、 异常处理与重试机制 。这些都是在分布式测试中需要重点关照的点。
3.2 配置pytest夹具(Fixtures)
pytest的夹具是管理测试依赖(如数据库连接、Celery Worker)的神器。我们在 tests/conftest.py 文件中定义它们,这样所有测试文件都能共享。
# tests/conftest.py
import pytest
import tempfile
import pandas as pd
from faker import Faker
from src.celery_app import app as celery_app
# 创建一个Faker实例用于生成测试数据
fake = Faker('zh_CN')
@pytest.fixture(scope='session')
def celery_config():
"""
提供给pytest-celery插件的配置夹具。
它会覆盖celery_config.py中的设置,确保测试使用独立的环境。
"""
return {
'broker_url': 'memory://', # 使用内存Broker,测试速度最快!
'result_backend': 'cache+memory://',
'task_always_eager': False, # 集成测试中,我们需要真正的异步执行
}
@pytest.fixture(scope='session')
def celery_worker_parameters():
"""
配置Celery Worker启动参数。
比如指定并发数、日志级别等。
"""
return {
'queues': ('default', 'high_priority'), # 可以测试多队列
'concurrency': 2, # 每个Worker启动2个进程,模拟并发
'loglevel': 'INFO',
}
@pytest.fixture
def sample_order_csv():
"""
生成一个包含模拟订单数据的临时CSV文件。
使用临时文件可以保证测试的隔离性,避免残留数据影响下一次测试。
"""
data = []
for i in range(10): # 生成10条测试订单
data.append({
'order_id': fake.uuid4(),
'customer_name': fake.name(),
'product_id': fake.bothify(text='PROD-#####'),
'quantity': fake.random_int(min=1, max=5),
'unit_price': round(fake.random_number(digits=2), 2)
})
df = pd.DataFrame(data)
# 创建一个临时文件,测试结束后自动删除
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
file_path = f.name
df.to_csv(file_path, index=False)
yield file_path # 将文件路径提供给测试用例
# 测试结束后,清理临时文件(NamedTemporaryFile在关闭后删除,这里用delete=False是为了控制删除时机)
import os
os.unlink(file_path)
@pytest.fixture
def invalid_order_csv():
"""
生成一个格式错误(例如为空)的CSV文件,用于测试异常路径。
"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
file_path = f.name
# 写入一个空文件或格式混乱的内容
f.write("not,valid,csv,content\nline1,line2")
yield file_path
import os
os.unlink(file_path)
这里有几个关键点:
-
celery_config夹具 :我们使用了memory://作为Broker。这是pytest-celery插件支持的一种特殊内存Broker,它不需要运行外部的Redis或RabbitMQ服务,所有消息在内存中传递, 测试速度极快 ,且完全隔离。这是做集成测试的“作弊码”。 -
celery_worker_parameters:可以精细控制测试中Worker的行为,比如并发数。设置concurrency: 2,我们就能用单个Worker节点测试任务是否真的能并行执行。 -
sample_order_csv夹具 :使用tempfile和Faker动态生成测试数据文件。这比使用固定的测试文件更灵活,也避免了测试间的相互干扰。yield语句是夹具提供资源并在测试后清理的标准模式。
4. 编写分布式任务测试用例
有了基础设施,现在可以动手写测试了。我们将从简单的单元测试过渡到复杂的集成与并发测试。
4.1 单元测试:验证任务逻辑(Eager模式)
首先,我们测试任务函数本身的逻辑是否正确,不涉及真正的Celery Worker。这通过设置 task_always_eager = True 来实现,此时任务会像普通函数一样被立即调用。
# tests/test_tasks.py
import pytest
from unittest.mock import Mock, patch
from src.rpa_bot.tasks import process_order_file
class TestProcessOrderFileUnit:
"""针对process_order_file任务的单元测试类(使用Eager模式)"""
def test_task_success(self, sample_order_csv, monkeypatch):
"""
测试任务正常执行流程。
使用monkeypatch模拟外部依赖函数的行为。
"""
# 模拟 validate_order 始终返回True
monkeypatch.setattr('src.rpa_bot.tasks.validate_order', lambda x: True)
# 模拟 update_inventory 函数,只记录调用,不真正操作数据库
mock_update = Mock()
monkeypatch.setattr('src.rpa_bot.tasks.update_inventory', mock_update)
# 调用任务(因为CELERY_TASK_ALWAYS_EAGER在单元测试环境可设为True,这里直接调用)
# 更稳妥的方式是通过Celery app调用,但为简化,我们直接测试函数核心逻辑。
# 我们先直接测试内部逻辑,集成测试再测完整Celery流程。
from src.rpa_bot.tasks import process_order_file
# 注意:在单元测试中,我们可能直接测试被@shared_task装饰的函数,但装饰器会带来复杂度。
# 更好的实践是将核心逻辑抽离成普通函数,任务只负责调用。这里为了演示,我们假设直接测试。
# 实际上,应该测试抽离出的业务函数。我们这里演示mock场景。
pass # 具体断言逻辑在集成测试中体现
def test_task_file_not_found(self, tmp_path):
"""
测试文件不存在的异常处理与重试机制。
"""
non_existent_file = tmp_path / "ghost.csv"
# 这里需要测试Celery的重试,更适合在集成测试中用真正的worker测试。
# 单元测试可以测试装饰器配置等。
assert not non_existent_file.exists()
实操心得 :对于被
@shared_task装饰的函数,纯粹的单元测试有时会有点别扭,因为装饰器添加了额外的上下文。一个非常有效的模式是**“任务即包装器”**:将核心业务逻辑写在一个普通的Python函数里,而Celery任务只是调用这个函数。这样,你可以对这个普通函数进行彻底的单元测试,而对Celery任务的测试则聚焦在分布式特性(如重试、序列化)上。例如:# src/rpa_bot/logic.py def process_order_file_core(file_path): # 所有核心业务逻辑 pass # src/rpa_bot/tasks.py @shared_task(bind=True) def process_order_file(self, file_path): try: return process_order_file_core(file_path) except FileNotFoundError: self.retry(...)这样分离,测试起来就清晰多了。
4.2 集成测试:与真实Worker交互
这才是重头戏。我们将使用 pytest-celery 提供的夹具,启动一个真正的、在内存中运行的Celery Worker来执行我们的任务。
# tests/test_tasks.py (续)
import time
from celery.result import AsyncResult
class TestProcessOrderFileIntegration:
"""使用真实Celery Worker的集成测试"""
def test_task_submission_and_success(self, celery_worker, celery_app, sample_order_csv):
"""
测试提交任务到Worker并成功执行。
celery_worker夹具由pytest-celery提供,会自动启动和停止Worker。
"""
# 1. 提交任务到Celery队列
# 注意:这里通过celery_app.send_task来调用,确保经过完整的Celery流程
task_result = celery_app.send_task('src.rpa_bot.tasks.process_order_file',
args=[sample_order_csv])
# 或者使用更直接的方式,从已注册的任务中获取
# from src.rpa_bot.tasks import process_order_file
# task_result = process_order_file.delay(sample_order_csv)
# 2. 异步获取结果,设置超时时间
# 在测试中,我们通常使用阻塞式的get()并设置超时,避免测试无限等待
try:
result = task_result.get(timeout=30) # 等待最多30秒
except TimeoutError:
pytest.fail(f"任务执行超时: {task_result.id}")
# 3. 断言任务执行结果符合预期
assert result is not None
assert 'file_path' in result
assert result['total_orders'] == 10
assert result['processed'] == 10 # 假设我们的mock函数让所有订单都成功
assert result['failed'] == 0
assert len(result['errors']) == 0
assert task_result.status == 'SUCCESS'
def test_task_retry_on_file_not_found(self, celery_worker, celery_app, tmp_path):
"""
测试任务在文件不存在时的自动重试行为。
这是一个更高级的测试,需要模拟一个暂时不可用后来才可用的资源。
"""
ghost_file = tmp_path / "delayed_file.csv"
# 提交一个处理不存在文件的任务
task_result = process_order_file.delay(str(ghost_file))
# 等待一小段时间,让任务执行第一次尝试并失败
time.sleep(2)
# 此时任务状态应为重试中
# 注意:获取状态可能仍是PENDING或RETRY,取决于重试间隔
# 更可靠的测试是检查任务是否至少失败了一次(通过日志或监控),但这里简化处理
# 我们可以断言任务没有成功
assert task_result.status != 'SUCCESS'
# 在实际场景中,你可能需要在这里创建文件,然后断言任务最终成功。
# 这涉及到更复杂的测试协调,通常可以结合Celery的测试mocker或使用更短的重试间隔(countdown)来测试。
def test_concurrent_task_execution(self, celery_worker, celery_app, tmp_path):
"""
测试Worker的并发处理能力。
一次性提交多个任务,验证它们是否被并行处理。
"""
# 创建多个测试文件
test_files = []
for i in range(5):
file_path = tmp_path / f"orders_batch_{i}.csv"
df = pd.DataFrame([{'order_id': f'oid_{i}_{j}', 'product_id': 'p1', 'quantity': 1} for j in range(3)])
df.to_csv(file_path, index=False)
test_files.append(str(file_path))
# 记录开始时间
start_time = time.time()
# 并发提交所有任务
task_results = []
for file_path in test_files:
# 注意:每个任务内部有sleep(0.1) * 3条订单 = ~0.3秒
# 如果是单线程,5个任务串行需要至少1.5秒
result = process_order_file.delay(file_path)
task_results.append(result)
# 等待所有任务完成
final_results = []
for tr in task_results:
final_results.append(tr.get(timeout=60)) # 设置较长超时
end_time = time.time()
total_time = end_time - start_time
# 断言所有任务成功
for res in final_results:
assert res['processed'] == 3
# 关键断言:如果并发有效,总耗时应远小于串行时间(5*0.3=1.5秒)
# 由于启动开销,我们断言它小于1.2秒,证明有并行发生
# 这个值需要根据实际任务时间和并发数调整
print(f"并发执行5个任务总耗时: {total_time:.2f}秒")
# 这是一个软断言,更多是用于验证配置是否生效
assert total_time < 1.2, f"任务执行似乎未并发,耗时{total_time}秒过长。"
# 清理临时文件
for fp in test_files:
os.unlink(fp)
这个集成测试类展示了三个核心场景:
- 基础任务流 :验证任务能正常提交、执行并返回结果。
- 异常与重试 :验证Celery的重试机制是否按配置工作。
- 并发能力 :这是分布式测试的精髓。通过同时提交多个包含
sleep的任务,并计算总耗时,我们可以直观地验证celery_worker_parameters中设置的concurrency: 2是否真的让Worker同时处理了多个任务。如果总耗时接近单个任务耗时乘以任务数量,说明是串行;如果总耗时大大缩短,说明并发生效了。
5. 高级测试场景与最佳实践
掌握了基础测试后,我们可以探索一些更复杂但非常实用的场景,这些往往是生产环境RPA稳定性的关键。
5.1 测试任务依赖链(Chord/Group/Chain)
复杂的RPA流程可能由多个子任务按特定顺序组成。Celery提供了 chain (链式)、 group (分组并行)、 chord (分组后回调)等原语来编排任务。
# src/rpa_bot/tasks.py (新增任务)
@shared_task
def fetch_data_from_api(source):
sleep(0.5)
return f"data_from_{source}"
@shared_task
def transform_data(raw_data):
# 模拟数据转换
sleep(0.3)
return raw_data.upper()
@shared_task
def load_data_to_db(transformed_data):
sleep(0.4)
return f"Loaded: {transformed_data}"
@shared_task
def final_callback(results):
"""Chord的回调任务,接收group中所有任务的结果列表"""
logger.info(f"所有数据加载完成,结果: {results}")
return all('Loaded' in r for r in results)
# tests/test_workflows.py
from celery import chain, group, chord
def test_task_chain(celery_worker, celery_app):
"""测试链式任务:A -> B -> C"""
# 构建链:fetch -> transform -> load
workflow = chain(
fetch_data_from_api.s('erp_system'),
transform_data.s(),
load_data_to_db.s()
)
result = workflow.delay()
final_result = result.get(timeout=10)
assert 'Loaded: DATA_FROM_ERP_SYSTEM' in final_result
def test_task_group_and_chord(celery_worker, celery_app):
"""测试分组与和弦:并行执行多个fetch,全部完成后执行callback"""
# 构建一个和弦:(fetch1, fetch2, fetch3) -> callback
header = group([
fetch_data_from_api.s('source1'),
fetch_data_from_api.s('source2'),
fetch_data_from_api.s('source3')
])
callback = final_callback.s()
workflow = chord(header)(callback) # 另一种写法:chord(header)(callback).delay()
# 和弦的结果是回调任务的结果
chord_result = workflow.get(timeout=10)
assert chord_result is True # 根据我们的callback逻辑,应该返回True
测试任务链和和弦能确保你的复杂业务流程在分布式环境下也能正确执行顺序和同步。
5.2 模拟外部服务与隔离测试
RPA任务经常调用外部HTTP API、数据库或文件系统。在测试中,我们不应该依赖真实的外部服务。使用 pytest-mock 或 unittest.mock 来模拟(Mock)这些调用。
pip install pytest-mock
# tests/test_with_mocks.py
import requests
from unittest.mock import Mock, patch
def test_task_with_external_api(mocker, celery_worker):
"""
测试一个需要调用外部API的RPA任务。
使用mocker.patch模拟requests.get调用。
"""
# 假设我们有一个任务调用某个API
@shared_task
def fetch_user_info(user_id):
response = requests.get(f'https://api.example.com/users/{user_id}')
response.raise_for_status()
return response.json()
# 模拟requests.get返回一个预定义的响应
mock_response = Mock()
mock_response.json.return_value = {'id': 123, 'name': '测试用户'}
mock_response.raise_for_status = Mock()
mocker.patch('requests.get', return_value=mock_response)
# 现在执行任务,它将使用我们模拟的requests.get
result = fetch_user_info.delay(123).get(timeout=5)
assert result['name'] == '测试用户'
# 验证模拟函数是否被以正确的参数调用
requests.get.assert_called_once_with('https://api.example.com/users/123')
注意事项 :模拟(Mock)的粒度要把握好。过度模拟会导致测试与实现细节耦合过紧(比如你模拟了一个函数内部的某个具体调用),一旦重构代码,测试就容易失败。更好的做法是模拟 外部边界 ,比如模拟一个你自定义的、封装了API调用的客户端类,而不是直接模拟
requests.get。这样即使内部从requests换成了httpx,测试也无需改动。
5.3 性能与压力测试雏形
虽然pytest-celery不是专业的压测工具,但我们可以用它来验证任务队列在持续负载下的行为,比如是否有内存泄漏、任务堆积情况。
# tests/test_performance.py
import statistics
def test_bulk_task_submission(celery_worker, celery_app, tmp_path):
"""批量提交大量任务,观察执行情况"""
num_tasks = 50
task_results = []
file_paths = []
# 准备50个测试文件
for i in range(num_tasks):
fp = tmp_path / f"stress_{i}.csv"
pd.DataFrame([{'order_id': f'stress_{i}', 'q': 1}]).to_csv(fp, index=False)
file_paths.append(str(fp))
import time
start = time.time()
# 快速提交所有任务
for fp in file_paths:
task_results.append(process_order_file.delay(fp))
# 等待所有任务完成,收集耗时
durations = []
for tr in task_results:
try:
tr.get(timeout=60) # 每个任务单独超时
# 可以在这里记录每个任务的状态、耗时(需要任务本身记录或通过事件监控)
except TimeoutError:
print(f"任务超时: {tr.id}")
finally:
# 简化处理,不记录详细耗时
pass
end = time.time()
total_duration = end - start
print(f"提交并完成 {num_tasks} 个任务,总耗时: {total_duration:.2f}秒")
print(f"平均每个任务耗时(包含队列等待): {total_duration/num_tasks:.2f}秒")
# 清理
for fp in file_paths:
os.unlink(fp)
# 一个简单的断言:确保没有任务失败(假设所有文件都有效)
# 在实际中,你需要检查task_results中每个结果的状态
success_count = sum(1 for tr in task_results if tr.successful())
assert success_count == num_tasks, f"有{num_tasks - success_count}个任务失败"
这个测试可以帮你发现一些配置问题,比如 Worker的并发数是否足够 、 任务是否有死锁 、 Redis连接池是否耗尽 。如果发现任务完成速度远低于预期,或者大量任务超时,就需要回头检查Celery的 worker_concurrency 、 broker_pool_limit 等配置了。
6. 持续集成(CI)集成与测试报告
让这套测试自动化跑起来,才能持续发挥价值。集成到CI/CD流水线中是下一步。
6.1 编写GitHub Actions工作流
在项目根目录创建 .github/workflows/test.yml 。
name: RPA Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.9', '3.10', '3.11'] # 测试多个Python版本兼容性
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
# 不需要单独安装Redis服务,因为pytest-celery测试用了memory broker
- name: Lint with flake8 (可选)
run: |
pip install flake8
flake8 src tests --count --max-complexity=10 --statistics
- name: Test with pytest
run: |
# 设置环境变量,确保测试使用内存Broker
export CELERY_BROKER_URL=memory://
export CELERY_RESULT_BACKEND=cache+memory://
pytest tests/ -v --cov=src --cov-report=xml --tb=short
- name: Upload coverage to Codecov (可选)
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
fail_ci_if_error: false
这个工作流会在每次代码推送或拉取请求时触发,在Ubuntu环境下用多个Python版本运行测试,并生成测试覆盖率报告。
6.2 生成丰富的测试报告
使用pytest的插件可以生成更易读的报告。
# 安装报告插件
pip install pytest-html pytest-cov
# 运行测试并生成HTML报告和覆盖率报告
pytest tests/ -v --html=report.html --self-contained-html --cov=src --cov-report=html:cov_html
运行后,会生成一个 report.html 文件,里面详细列出了每个测试用例的执行结果、通过/失败状态、甚至日志输出。 cov_html 目录下则是一个详细的代码覆盖率报告,告诉你哪些代码行被测试覆盖了,哪些没有。把这些报告归档到CI流水线的产物中,对于分析测试失败原因和评估测试完整性非常有帮助。
7. 常见问题排查与调试技巧
在实际操作中,你肯定会遇到各种问题。这里记录了一些我踩过的坑和解决方法。
7.1 Worker启动失败或任务不被执行
症状 :测试运行时,任务一直处于 PENDING 状态,或者日志显示Worker无法连接Broker。
- 检查1:Broker URL配置 :确保
celery_config夹具或环境变量中的broker_url正确。在测试中,使用memory://是最省事的。如果要用真实的Redis,确保Redis服务已启动且可连接(redis-cli ping)。 - 检查2:任务模块发现 :确认Celery应用的
app.autodiscover_tasks()路径包含了你的任务模块。在测试中,由于路径问题,有时需要手动导入任务模块:在conftest.py或测试文件开头添加import src.rpa_bot.tasks。 - 检查3:Worker日志 :在测试中,可以通过配置
celery_worker_parameters中的loglevel为DEBUG来获取更详细的日志。有时任务序列化失败(比如传入了不可Pickle的对象)也会导致静默失败,DEBUG日志能帮你发现。
7.2 任务超时(TimeoutError)
症状 : result.get(timeout=10) 抛出 TimeoutError 。
- 原因1:任务执行时间过长 :测试环境的任务模拟了
sleep,可能比预期更久。增加get()的timeout参数值。 - 原因2:Worker并发数不足,任务排队 :如果一次性提交了大量任务,而Worker的
concurrency设置得很小(比如为1),任务就会排队。在性能测试中,适当增加celery_worker_parameters中的concurrency值(例如设置为CPU核心数)。 - 原因3:任务本身卡死或死锁 :检查任务代码中是否有无限循环、死锁(如共享资源竞争)或对外部服务的同步调用(应改为异步)。在测试中,可以使用
timeout参数并捕获异常,然后通过result.state查看任务状态,或者使用app.control.inspect()来检查Worker的活动和预留任务。
7.3 测试数据污染与隔离
症状 :测试A通过后,测试B莫名其妙失败,或者测试结果不稳定。
- 最佳实践:使用夹具和临时资源 :就像我们之前用
tempfile.NamedTemporaryFile和yield一样,确保每个测试使用的文件、数据库连接都是独立的,并在测试后清理。对于数据库,可以使用pytest-django或pytest-sqlalchemy等插件来管理事务回滚。 - 内存Broker的隔离 :
pytest-celery的memory://broker在默认情况下为每个测试会话(scope='session')创建一个独立的Broker。但如果你将celery_config夹具的scope设为function(默认),那么每个测试函数都会获得一个全新的、空的消息队列,实现了完美的隔离。这是它相对于使用外部Redis的巨大优势。
7.4 在测试中查看和监控任务状态
有时你需要调试,想知道任务到底在哪一步卡住了。
def test_debug_task_state(celery_app, celery_worker):
"""演示如何检查任务状态"""
task_result = process_order_file.delay('dummy_path')
# 方法1:直接检查状态属性
print(f"初始状态: {task_result.state}") # 可能是 PENDING, STARTED, SUCCESS, FAILURE, RETRY
print(f"任务ID: {task_result.id}")
# 方法2:使用Celery的检查(inspect)API(需要Worker运行)
# 这能查看Worker当前正在执行什么任务、队列里有哪些任务等。
inspector = celery_app.control.inspect()
# 查看活跃的任务
active_tasks = inspector.active()
# 查看预留(已预取)的任务
reserved_tasks = inspector.reserved()
# 查看已注册的任务
registered_tasks = inspector.registered()
# 注意:在测试中使用inspect API可能因为Worker是临时启动的而获取不到信息,
# 或者获取的信息是上一次测试残留的(如果broker未隔离)。在集成测试中谨慎使用。
把这些调试技巧封装成一个辅助函数或夹具,在遇到棘手的测试失败时能帮你快速定位问题。
走到这里,你已经拥有了一个覆盖单元、集成、并发和部分性能场景的自动化测试套件。它不仅能验证你的RPA任务逻辑是否正确,更能模拟生产环境的分布式特性,提前发现并发竞争、资源死锁、异常恢复等深层问题。记住,自动化测试不是一次性的工作,随着RPA流程的复杂化,你需要不断补充新的测试用例,特别是那些针对业务异常流和边界条件的测试。把这套流程融入到你的开发习惯和CI/CD管道中,它将成为你交付稳定、可靠RPA解决方案最坚实的保障。
更多推荐
所有评论(0)