XXL-JOB

常见任务调度

单机:Timer、ExectorService、spring@scheduled

分布式:xxl-job、quartz、elastic-job

原生定时任务的先天缺陷

XXL-JOB简介

由调度中心和执行器组成,调度中心提供一个web管理配置任务和执行器,调度中心通过rpc触发执行器

1、简单:支持通过Web页面对任务进行CRUD操作,操作简单,一分钟上手;

2、动态:支持动态修改任务状态、启动/停止任务,以及终止运行中任务,即时生效;

3、路由策略:执行器集群部署时提供丰富的路由策略,包括:第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等;

4、故障转移:任务路由策略选择”故障转移”情况下,如果执行器集群中某一台机器故障,将会自动Failover切换到一台正常的执行器发送调度请求。

5、任务超时控制:支持自定义任务超时时间,任务运行超时将会主动中断任务

6、一致性:“调度中心”通过DB锁保证集群分布式调度的一致性, 一次任务调度只会触发一次执行

7、邮件报警:任务失败时支持邮件报警,支持配置多邮件地址群发报警邮件;

8、任务进度监控:支持实时监控任务进度;

架构图

集成XXL-JOB配置

xxl-job调度中心配置内容说明:

### 调度中心JDBC链接:链接地址
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root_pwd
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
### 报警邮箱
spring.mail.host=smtp.qq.com
spring.mail.port=25
spring.mail.username=xxx@qq.com
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
### 调度中心通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=
### 调度中心国际化配置 [必填]: 默认为 "zh_CN"/中文简体, 可选范围为 "zh_CN"/中文简体, "zh_TC"/中文繁体 and "en"/英文;
xxl.job.i18n=zh_CN
## 调度线程池最大线程配置【必填】
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
### 调度中心日志表数据保存天数 [必填]:过期日志自动清理;限制大于等于7时生效,否则, 如-1,关闭自动清理功能;
xxl.job.logretentiondays=30

执行器配置:

### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### 执行器通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=xxl-job-executor-sample
### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
xxl.job.executor.address=
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
xxl.job.executor.ip=
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.port=9999
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
xxl.job.executor.logretentiondays=30

组件:

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }

阻塞处理策略

1.单机串行(默认)

调度请求进入单机执行器后,调度请求进入队列并以串行方式运行(上一次还没执行完毕,后面的排队等待)

2.丢弃后续调度(推荐)

调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败

3.覆盖之前的调度(不推荐)

调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务

超时时间

如果任务执行的时间超过了设置的超时时间,那么任务会被打断,停止执行

路由策略

路由策略,核心就是xxljob 到底去哪台服务器找任务执行

第一个

当选择该策略时,会选择执行器注册地址的第一台机器执行,如果第一台机器出现故障,则调度任务失败。

使用注册地址列表中的第一个address

最后一个

当选择该策略时,会选择执行器注册地址的最后一台机器执行,如果机器出现故障,则调度任务失败。

轮询

当选择该策略时,会按照执行器注册地址轮询分配任务,如果其中一台机器出现故障,调度任务失败,任务不会转移

一致性HASH

分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;

当选择该策略时,每个任务按照Hash算法固定选择某一台机器。如果那台机器出现故障,调度任务失败,任务不会转移。

最不经常使用

对每一个使用的地址进行保存次数,之后按照次数进行从小到大的排序,取出次数小的进行使用,再给当前次数加一,一直这样操作

最近最久未使用

将好久不使用的排到最前面

故障转移

当选择该策略时,按照顺序依次进行心跳检测,如果其中一台机器出现故障,则会转移到下一个执行器,若心跳检测成功,会选定为目标执行器并发起调度

忙碌转移

当要执行的机器正忙(无可执行线程/需要排队),则使用下一个address执行。

当选择该策略时,按照顺序依次进行空闲检测,如果其中一台机器出现故障,则会转移到下一个执行器,若空闲检测成功,会选定为目标执行器并发起调度。

分片广播

这个模式下会对所有的执行器广播这个任务,所有的节点都会接收到调用请求,每个节点可以根据总分片数和当前任务的索引进行相关业务处理

    @XxlJob("testDemo2")
    public ReturnT testDemo2() {
        //index:当前分片序号(从0开始),执行器集群列表中当前执行器的序号
        int index = XxlJobHelper.getShardIndex();
        //total:总分片数,执行器集群的总机器数量
        int total = XxlJobHelper.getShardTotal();
        List<User> dates = query();
        for (User user : dates) {
            if (user.getId().hashCode() % total == index) {
                log.info("当前分片机器序号:{},处理的数据:{}", index, JSONObject.toJSONString(user));
            }
        }
        //返回参数  可用于查看结果、执行成功数等
        XxlJobHelper.handleSuccess("成功");
        return ReturnT.SUCCESS;
    }

调度过期策略

间隔多长时间查询一次数据库合适?

当有查询出了可调度的定时任务时,如果执行了调度且调度时间小于1s时,就会等待1s再执行下一次循环,如果没有执行调度,则等待5s

一个任务下一次执行的时间已经在数据库了,代码拿出来这个值,加5秒,然后和当前时间的秒数进行比较,如果当前时间的秒数 比 下一次执行的时间加5秒还多,那么就是当前任务过期了,既然过期了,那么就要对这个任务做出判断,到底是立即执行,还是忽略

父子任务

父子任务,是指父任务执行后会自动调用子任务,完成一种类似于链式的调用,在使用时只需要配置子任务的ID即可

效果图:

参数传递

    @XxlJob("paramTask")
    public ReturnT paramTask() {
        XxlJobHelper.log(" paramTask的定时任务开始");
        //单个参数
        String param = XxlJobHelper.getJobParam();
        if (StringUtils.isNotBlank(param)) {
            log.info("单个参数:{}", param);
            //多个参数
            String[] params = param.split(",");
            log.info("第一个参数:{}", params[0]);
            log.info("第二个参数:{}", params[1]);
        }
        //返回参数  可用于查看结果、执行成功数等
        XxlJobHelper.handleSuccess("成功");
        return ReturnT.SUCCESS;
    }

日志回调

指执行器在执行任务时可以将执行日志传递给调度中心,即使任务没有执行完成,调度中心也可以看到回调的调度日志内容。更细化的分析任务执行情况

     @XxlJob("logbackTask")
    public ReturnT logbackTask() throws InterruptedException {
        XxlJobHelper.log("当前程序执行到了:{}行", 156);
        Thread.sleep(5000);
        XxlJobHelper.log("当前程序执行到了:{}行", 200);
        Thread.sleep(5000);
        XxlJobHelper.log("当前程序执行完成");
        XxlJobHelper.handleSuccess("成功");
        return ReturnT.SUCCESS;
    }

任务的生命周期

xxl-job支持在调用任务时,第一次调用时先执行指定的方法,然后在执行具体的任务,当执行器停止时会执行指定的方法


    @XxlJob(value="lifeCycleTask",init = "startInit",destroy = "destroy")
    public ReturnT lifeCycleTask() throws InterruptedException {
        log.info("==============生命周期演示============");
        XxlJobHelper.handleSuccess("成功");
        return ReturnT.SUCCESS;
    }
    public void startInit(){
        log.info("第一次调用当前任务时,才会执行,预处理");
    }

    public void destroy(){
        log.info("当执行器停止时才会执行");
    }

XXL-JOB缺点

“调度中心”通过DB锁的形式来保证集群分布式调度的一致性,如果耗时较小的短调度任务很多,随着调度中心集群数量增加,会增大DB的压力,数据库的锁竞争会比较厉害,造成数据库压力,性能下降。

更多推荐