阿里云ONS / RocketMQ的定时消息 / 延时消息
实现定时消息(延时消息),比较阿里云ONS、开源RocketMQ、Java异步线程、持久化定时任务等几种方式
考虑延时和定时消息,是因为遇到了一个业务场景:
前置任务完成时发送消息,但因为一些业务原因,不希望消息马上被消费,因此需要设置延时。
几种解决思路
考虑了几种实现方式:
- 消息队列的延迟消息,目前仅有RocketeMQ等少量产品支持。
- Java异步线程
- 持久化定时任务
因为目前业务量不大、尽可能实现简单、快速,最终选择了方案一,理由如下:
- 异步线程虽然实现简单,但有消息未发送风险,如线程等待期间、进程突然crash。而ONS不依赖java进程、无该问题,后面会讲到。
- 持久化定时任务可以解决消息丢失、任务争抢等问题,但要实现可靠方案,复杂度比较高、要花费很多时间。
- 消息队列虽然也可能出现生产丢信的风险,但已经做了一定的保证,相比异步线程风险更低,可以接受。
以后业务量大或要求高了,再改成方案三,或者方案一结合方案三:
- 性能相对有保障;而消息队列延时消息可能出现存储和消费性能问题
- 消息/任务吞吐量,可以通过增加节点快速扩容
- 存储上可以单独规划、快速扩容
- 不用担心消息堆积时、宕机后运行时态会丢失
简单介绍下几种方案的实现。
实现方案
ONS延迟消息
据官方说法,ONS是基于RocketMQ实现的产品,但和开源RocketMQ的api上有一些不同。
RocketMQ
RocketMQ
的消息(org.apache.rocketmq.common.message.Message
),通过指定延迟级别来设置延时:
...
// 10秒后传递给consumer
message.setDelayTimeLevel(3);
...
该方式仅支持特定的level
,如1s/5s/10s/1m
等,不能任意设置时间。level
入参的定义,0
表示不延时
,1
表示1级延时(1s)
,2
表示2级延时(5s)
,以此类推。
- 默认的
level
定义:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- 在服务器端(
rocketmq-broker
端)的属性配置文中,可以调整messageDelayLevel
,实现自定义,而且支持s
、m
、h
、d
,分别表示秒、分、时、天。我们用的ONS,没有该配置。但推测自己部署的情况,维护该配置也是一个麻烦事
延迟队列的实现思路:
producer
发出消息broker
在准备将消息写入存储的时候,延时消息会更改Message
的topic
为延时消息队列的topic
,也就是将消息投递到延时消息队列。不同延迟级别、对应不同队列。- 定时线程不断读取队列,延迟时间到了,就转换为普通的消息,存到真实的
topic
下。此时consumer
才能看到并消费该消息
因为延迟逻辑在broker处理,并不依赖Java进程。
ONS
ONS
消息(com.aliyun.openservices.ons.api.Message
)通过setStartDeliverTime
设置延时:
...
// 10s后传递给consumer
message.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
...
该方式实际上是定时消息,因为设置的是绝对时间,也就是说除了当前时间延后多久,还可以通过指定具体某个时间来设置:
...
// 设置2019-07-24 12:30:00投递
message.setStartDeliverTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2019-07-24 12:30:00").getTime());
...
注意最大延迟时间为7
天。
Java异步线程
适合消息队列不支持定时效期,且业务要求不严谨、但要求尽快实现的情况。
// 延迟10s执行
Thread async = new Thread(() -> {
try{
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("async done");
}, "async");
async.start();
// 注意不要用run方法,会立即使用当前cpu时间片开始执行
// async.run();
注意:实际使用时,需通过线程池创建线程,避免创建太多线程抢占CPU资源,影响主业务
关于Java线程的常见用法,可参考:《如何创建Java线程》、《如何启动Java线程》。
如前面所说,考虑到极端情况下进程崩溃、异步线程未执行的风险,业务要求严谨的场景,不建议采用该方式。
持久化定时任务
这是比较常规、传统的方式,很多定时的、业务复杂的任务都采用该方案。
任务并发量较大的情况,这种方式还是直接有效的。只是有一定实现复杂度,主要在调度、任务争抢方面。
我们是通过关系数据库来实现的持久化任务,任务实体包括任务类型、计划执行时间、状态、锁,还有一些扩展字段如数据过滤表达式。
以上。欢迎您的阅读。
另附阿里云产品的优惠链接,有需要的自取。
更多推荐
所有评论(0)