考虑延时和定时消息,是因为遇到了一个业务场景:

前置任务完成时发送消息,但因为一些业务原因,不希望消息马上被消费,因此需要设置延时。

几种解决思路

考虑了几种实现方式:

  1. 消息队列的延迟消息,目前仅有RocketeMQ等少量产品支持。
  2. Java异步线程
  3. 持久化定时任务

因为目前业务量不大、尽可能实现简单、快速,最终选择了方案一,理由如下:

  • 异步线程虽然实现简单,但有消息未发送风险,如线程等待期间、进程突然crash。而ONS不依赖java进程、无该问题,后面会讲到。
  • 持久化定时任务可以解决消息丢失、任务争抢等问题,但要实现可靠方案,复杂度比较高、要花费很多时间。
  • 消息队列虽然也可能出现生产丢信的风险,但已经做了一定的保证,相比异步线程风险更低,可以接受。

以后业务量大或要求高了,再改成方案三,或者方案一结合方案三:

  • 性能相对有保障;而消息队列延时消息可能出现存储和消费性能问题
    • 消息/任务吞吐量,可以通过增加节点快速扩容
    • 存储上可以单独规划、快速扩容
  • 不用担心消息堆积时、宕机后运行时态会丢失

简单介绍下几种方案的实现。

实现方案

ONS延迟消息

据官方说法,ONS是基于RocketMQ实现的产品,但和开源RocketMQ的api上有一些不同。

RocketMQ

RocketMQ的消息(org.apache.rocketmq.common.message.Message),通过指定延迟级别来设置延时:

...
// 10秒后传递给consumer
message.setDelayTimeLevel(3);
...

该方式仅支持特定的level,如1s/5s/10s/1m等,不能任意设置时间。level入参的定义,0表示不延时1表示1级延时(1s)2 表示2级延时(5s),以此类推。

  • 默认的level定义:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  • 在服务器端(rocketmq-broker端)的属性配置文中,可以调整messageDelayLevel,实现自定义,而且支持smhd,分别表示秒、分、时、天。

    我们用的ONS,没有该配置。但推测自己部署的情况,维护该配置也是一个麻烦事

延迟队列的实现思路:

  1. producer发出消息
  2. broker在准备将消息写入存储的时候,延时消息会更改Messagetopic为延时消息队列的topic,也就是将消息投递到延时消息队列。不同延迟级别、对应不同队列。
  3. 定时线程不断读取队列,延迟时间到了,就转换为普通的消息,存到真实的topic下。此时consumer才能看到并消费该消息

因为延迟逻辑在broker处理,并不依赖Java进程。

ONS

ONS消息(com.aliyun.openservices.ons.api.Message)通过setStartDeliverTime设置延时:

...
// 10s后传递给consumer
message.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
...

该方式实际上是定时消息,因为设置的是绝对时间,也就是说除了当前时间延后多久,还可以通过指定具体某个时间来设置:

...
// 设置2019-07-24 12:30:00投递
message.setStartDeliverTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2019-07-24 12:30:00").getTime());
...

注意最大延迟时间7天。

Java异步线程

适合消息队列不支持定时效期,且业务要求不严谨、但要求尽快实现的情况。

// 延迟10s执行
Thread async = new Thread(() -> {
    try{
        Thread.sleep(10 * 1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("async done");
}, "async");
async.start();
// 注意不要用run方法,会立即使用当前cpu时间片开始执行
// async.run();

注意:实际使用时,需通过线程池创建线程,避免创建太多线程抢占CPU资源,影响主业务
关于Java线程的常见用法,可参考:《如何创建Java线程》《如何启动Java线程》

如前面所说,考虑到极端情况下进程崩溃、异步线程未执行的风险,业务要求严谨的场景,不建议采用该方式。

持久化定时任务

这是比较常规、传统的方式,很多定时的、业务复杂的任务都采用该方案。
任务并发量较大的情况,这种方式还是直接有效的。只是有一定实现复杂度,主要在调度、任务争抢方面。
我们是通过关系数据库来实现的持久化任务,任务实体包括任务类型、计划执行时间、状态、锁,还有一些扩展字段如数据过滤表达式。


以上。欢迎您的阅读。

另附阿里云产品的优惠链接,有需要的自取。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐