在 Node.js 中使用 Bull 进行异步任务处理
由Godwin Ekuma撰写✏️ 在处理来自 API 客户端的请求时,您可能会遇到这样一种情况,即请求启动 CPU 密集型操作,这可能会阻止其他请求。您可以通过在称为队列的处理器中添加有关任务的信息,将其推迟到将来处理,而不是立即处理此类任务并阻止其他请求。然后,任务消费者将从队列中提取任务并进行处理。 队列有助于以优雅的方式解决常见的应用程序扩展和性能挑战。根据NestJS 文档,队列可以帮助
由Godwin Ekuma撰写✏️
在处理来自 API 客户端的请求时,您可能会遇到这样一种情况,即请求启动 CPU 密集型操作,这可能会阻止其他请求。您可以通过在称为队列的处理器中添加有关任务的信息,将其推迟到将来处理,而不是立即处理此类任务并阻止其他请求。然后,任务消费者将从队列中提取任务并进行处理。
队列有助于以优雅的方式解决常见的应用程序扩展和性能挑战。根据NestJS 文档,队列可以帮助解决的问题示例包括:
-
平滑处理峰值
-
分解可能会阻塞 Node.js 事件循环的单一任务
-
提供跨各种服务的可靠通信渠道
Bull是一个 Node 库,它基于Redis实现了一个快速、健壮的队列系统。虽然可以直接使用 Redis 命令实现队列,但 Bull 是 Redis 之上的抽象/包装器。它提供了一个处理所有底层细节的 API,并丰富了 Redis 的基本功能,从而可以轻松处理更复杂的用例。
安装
在我们开始使用 Bull 之前,我们需要安装 Redis。按照Redis Labs指南安装 Redis,然后使用 npm 或 yarn 安装 Bull。
npm install bull --save
进入全屏模式 退出全屏模式
或者:
yarn add bull
进入全屏模式 退出全屏模式
创建队列
通过实例化 Bull 的新实例来创建队列。
语法
Queue(queueName: string, url?: string, opts?: QueueOptions): Queue
进入全屏模式 退出全屏模式
可选的url
参数用于指定 Redis 连接字符串。如果没有指定url
,bull 将尝试连接到运行在localhost:6379
上的默认 Redis 服务器
QueueOptions
接口
interface QueueOptions {
limiter?: RateLimiter;
redis?: RedisOpts;
prefix?: string = 'bull'; // prefix for all queue keys.
defaultJobOptions?: JobOpts;
settings?: AdvancedSettings;
}
进入全屏模式 退出全屏模式
RateLimiter
limiter:RateLimiter
是QueueOptions
中的可选字段,用于配置一次可以处理的作业的最大数量和持续时间。有关详细信息,请参阅RateLimiter。
RedisOption
redis: RedisOpts
也是QueueOptions
中的一个可选字段。它是 Redisurl
字符串的替代品。有关详细信息,请参阅RedisOpts
。
AdvancedSettings
settings: AdvancedSettings
是一种高级队列配置设置。它是可选的,Bull 警告说,除非您对队列的内部有很好的了解,否则不应覆盖默认的高级设置。有关详细信息,请参阅高级设置。
基本队列如下所示:
const Queue = require(bull);
const videoQueue - new Queue('video');
进入全屏模式 退出全屏模式
使用QueueOptions
创建队列
// limit the queue to a maximum of 100 jobs per 10 seconds
const Queue = require(bull);
const videoQueue - new Queue('video', {
limiter: {
max: 100,
duration: 10000
}
});
进入全屏模式 退出全屏模式
每个队列实例可以执行三个不同的角色:作业生产者、作业消费者和/或事件侦听器。每个队列可以有一个或多个生产者、消费者和侦听器。
生产商
作业生产者创建任务并将其添加到队列实例。 Redis 只存储序列化的数据,所以任务应该作为 JavaScript 对象加入队列,这是一种可序列化的数据格式。
add(name?: string, data: object, opts?: JobOpts): Promise<Job>
进入全屏模式 退出全屏模式
如果队列为空,任务将立即执行。否则,一旦处理器空闲或基于任务优先级,任务将被添加到队列中并执行。
您可以添加可选的名称参数以确保只有使用特定名称定义的处理器才能执行任务。一个命名的作业必须有一个对应的命名消费者。否则,队列将抱怨您缺少给定作业的处理器。
工作选择
作业可以具有与其关联的其他选项。在add()
方法中的数据参数之后传递一个选项对象。
工作选项属性包括:
interface JobOpts {
priority: number; // Optional priority value. ranges from 1 (highest priority) to MAX_INT (lowest priority). Note that
// using priorities has a slight impact on performance, so do not use it if not required.
delay: number; // An amount of miliseconds to wait until this job can be processed. Note that for accurate delays, both
// server and clients should have their clocks synchronized. [optional].
attempts: number; // The total number of attempts to try the job until it completes.
repeat: RepeatOpts; // Repeat job according to a cron specification.
backoff: number | BackoffOpts; // Backoff setting for automatic retries if the job fails
lifo: boolean; // if true, adds the job to the right of the queue instead of the left (default false)
timeout: number; // The number of milliseconds after which the job should be fail with a timeout error [optional]
jobId: number | string; // Override the job ID - by default, the job ID is a unique
// integer, but you can use this setting to override it.
// If you use this option, it is up to you to ensure the
// jobId is unique. If you attempt to add a job with an id that
// already exists, it will not be added.
removeOnComplete: boolean | number; // If true, removes the job when it successfully
// completes. A number specified the amount of jobs to keep. Default behavior is to keep the job in the completed set.
removeOnFail: boolean | number; // If true, removes the job when it fails after all attempts. A number specified the amount of jobs to keep
// Default behavior is to keep the job in the failed set.
stackTraceLimit: number; // Limits the amount of stack trace lines that will be recorded in the stacktrace.
}
interface RepeatOpts {
cron?: string; // Cron string
tz?: string; // Timezone
startDate?: Date | string | number; // Start date when the repeat job should start repeating (only with cron).
endDate?: Date | string | number; // End date when the repeat job should stop repeating.
limit?: number; // Number of times the job should repeat at max.
every?: number; // Repeat every millis (cron setting cannot be used together with this setting.)
count?: number; // The start value for the repeat iteration count.
}
interface BackoffOpts {
type: string; // Backoff type, which can be either `fixed` or `exponential`. A custom backoff strategy can also be specified in `backoffStrategies` on the queue settings.
delay: number; // Backoff delay, in milliseconds.
}
进入全屏模式 退出全屏模式
一个基本的生产者应该是这样的:
const videoQueue - new Queue('video')
videoQueue.add({video: 'video.mp4'})
进入全屏模式 退出全屏模式
命名作业可以这样定义:
videoQueue.add('video'. {input: 'video.mp4'})
进入全屏模式 退出全屏模式
下面是使用作业选项自定义作业的示例。
videoQueue.add('video'. {input: 'video.mp4'}, {delay: 3000, attempts: 5, lifo: true, timeout: 10000 })
进入全屏模式 退出全屏模式
消费者
工作消费者,也称为工作者,定义了一个过程函数(处理器)。 process 函数负责处理队列中的每个作业。
process(processor: ((job, done?) => Promise<any>) | string)
进入全屏模式 退出全屏模式
如果队列为空,则一旦将作业添加到队列中,就会调用 process 函数。否则,每次worker空闲并且队列中有作业要处理时都会调用它。
流程函数将作业实例作为第一个参数传递。作业包括流程功能处理任务所需的所有相关数据。数据包含在作业对象的data
属性中。作业还包含方法,例如用于报告作业进度的progress(progress?: number)
、用于向该作业特定作业添加日志行的log(row: string)
、moveToCompleted
、moveToFailed
等。
Bull 按照添加到队列中的顺序处理作业。如果您希望并行处理作业,请指定concurrency
参数。 Bull 然后将并行调用工人,尊重RateLimiter
的最大值。
process(concurrency: number, processor: ((job, done?) => Promise<any>) | string)
进入全屏模式 退出全屏模式
如上所示,可以命名作业。命名作业只能由命名处理器处理。通过在 process 函数中指定 name 参数来定义命名处理器。
process(name: string, concurrency: number, processor: ((job, done?) => Promise<any>) | string)
进入全屏模式 退出全屏模式
事件监听器
在队列和/或作业的整个生命周期中,Bull 会发出有用的事件,您可以使用事件侦听器来侦听这些事件。事件可以是给定队列实例(工作者)的本地事件。本地事件的侦听器将仅接收在给定队列实例中生成的通知。
以下是本地进度事件。
queue.on('progress', function(job, progress){
console.log(`${jod.id} is in progress`)
})
进入全屏模式 退出全屏模式
其他可能的事件类型包括error
、waiting
、active
、stalled
、completed
、failed
、paused
、resumed
、cleaned
、drained
和removed
。
通过在本地事件名称前加上global:
前缀,您可以监听给定队列中所有工作人员产生的所有事件。
下面是一个全球进步事件。
queue.on('global:progress', function(jobId){
console.log(`${jobId} is in progress`)
})
进入全屏模式 退出全屏模式
请注意,对于全局事件,传递的是jobId
而不是作业对象。
一个实例
假设一家电子商务公司希望鼓励客户在其市场上购买新产品。该公司决定为用户添加一个选项,让他们选择接收有关新产品的电子邮件。
因为传出电子邮件是可能具有非常高延迟和失败的互联网服务之一,所以我们需要将针对新市场到达的电子邮件发送行为排除在这些操作的典型代码流之外。为此,我们将使用任务队列来记录需要向谁发送电子邮件。
const Queue = require('bull');
const sgMail = require('@sendgrid/mail');
sgMail.setApiKey(process.env.SENDGRID_API_KEY);
export class EmailQueue{
constructor(){
// initialize queue
this.queue = new Queue('marketplaceArrival');
// add a worker
this.queue.process('email', job => {
this.sendEmail(job)
})
}
addEmailToQueue(data){
this.queue.add('email', data)
}
async sendEmail(job){
const { to, from, subject, text, html} = job.data;
const msg = {
to,
from,
subject,
text,
html
};
try {
await sgMail.send(msg)
job.moveToCompleted('done', true)
} catch (error) {
if (error.response) {
job.moveToFailed({message: 'job failed'})
}
}
}
}
进入全屏模式 退出全屏模式
结论
到目前为止,您应该对 Bull 所做的事情以及如何使用它有一个扎实的、基本的了解。
要了解有关使用 Bull 实现任务队列的更多信息,请查看GitHub上的一些常见模式。
仅 200 个 u200e✅:监控生产中失败和缓慢的网络请求
部署基于节点的 Web 应用程序或网站是很容易的部分。确保您的 Node 实例继续为您的应用程序提供资源是事情变得更加困难的地方。如果您有兴趣确保对后端或第三方服务的请求成功,请尝试使用 LogRocket。
[](https://res.cloudinary.com/practicaldev/image/fetch/s--UHOs84WY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://thepracticaldev. s3.amazonaws.com/i/jd2amkhuiynfjf3i33qk.png)
LogRocket就像一个用于网络应用程序的 DVR,几乎可以记录您网站上发生的所有事情。无需猜测问题发生的原因,您可以汇总和报告有问题的 GraphQL 请求,以快速了解根本原因。此外,您可以跟踪 Apollo 客户端状态并检查 GraphQL 查询的键值对。
LogRocket 检测您的应用程序以记录基准性能时间,例如页面加载时间、第一个字节的时间、缓慢的网络请求,并记录 Redux、NgRx 和 Vuex 操作/状态。免费开始监控。
帖子Asynchronous task processing in Node.js with Bull首先出现在LogRocket Blog上。
更多推荐
所有评论(0)