依旧是从ruoyi学习,这次是任务调度框架Quartz

参考

Quartz官方文档

quartz (从原理到应用)详解篇

Quartz 是什么?一文带你入坑 - 知乎

任务调度,也可说是定时任务,其实还是很常见的在我们的生活中,最常见的莫过于闹钟了,这就是一种定时任务,其他还有提醒事项,消息订阅等等

入门

关于Quartz,首先要知道JobJobDetailTriggerScheduler

我简单的用非官方的语言,我的理解,讲一下

  • Job:具体的调用接口,关键是要实现其中的execute方法

  • JobDetail:区别不同类型的任务,还包括Job实例的属性

  • Trigger:触发器,配置不同任务的执行策略

  • Scheduler:调度器,调度任务

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>x.x.x</version>
</dependency>

Scheduler

Scheduler作为真正的任务调度器,肯定是需要配置各种参数的,拿ruoyi配置来看

/**
 * 定时任务配置
 *
 * @author ruoyi
 */
@Configuration
public class ScheduleConfig {
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource) {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setDataSource(dataSource);

        // quartz参数
        Properties prop = new Properties();
        prop.put("org.quartz.scheduler.instanceName", "RuoyiScheduler");
        prop.put("org.quartz.scheduler.instanceId", "AUTO");
        // 线程池配置
        prop.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
        prop.put("org.quartz.threadPool.threadCount", "20");
        prop.put("org.quartz.threadPool.threadPriority", "5");
        // JobStore配置
        prop.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
        // 集群配置
        prop.put("org.quartz.jobStore.isClustered", "true");
        prop.put("org.quartz.jobStore.clusterCheckinInterval", "15000");
        prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
        prop.put("org.quartz.jobStore.txIsolationLevelSerializable", "true");

        // sqlserver 启用
        // prop.put("org.quartz.jobStore.selectWithLockSQL", "SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = ?");
        prop.put("org.quartz.jobStore.misfireThreshold", "12000");
        prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
        factory.setQuartzProperties(prop);

        factory.setSchedulerName("RuoyiScheduler");
        // 延时启动
        factory.setStartupDelay(1);
        factory.setApplicationContextSchedulerContextKey("applicationContextKey");
        // 可选,QuartzScheduler
        // 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了
        factory.setOverwriteExistingJobs(true);
        // 设置自动启动,默认为true
        factory.setAutoStartup(true);

        return factory;
    }
}

ruoyi的配置是一个很好的实战参考,其上的注解也写的非常明白了

主参数配置

Quartz主参数配置,可参考Quartz主配置ruoyi配置只有两项

  • org.quartz.scheduler.instanceName:本身对于调度程序没有意义,如是集群模式需要同一逻辑调度程序使用相同名称

  • org.quartz.scheduler.instanceId:调度程序id方法,配置为Auto则自动生成)

线程池配置

ThreadPool配置,可参考Quartz配置ThreadPool设置

  • org.quartz.threadPool.classThreadPool的实现类,Quartz自带的org.quartz.simpl.SimpleThreadPool已经可以了

  • org.quartz.threadPool.threadCount:线程数,根据场景需要而配置

  • org.quartz.threadPool.threadPriority:线程优先级

JobStore配置

顾名思义,就是说Job这些任务的存储方式,一共有三种方式

  • RAMJobStore:内存存储,快速方便,易丢失

  • JDBC-JobStoreTXJDBC-JobStoreCMT:这两个对比着说最好,他们都是通过JDBC来存储,因此不易失,两者主要区别是事务由谁管理,TX表示自己来管理事务,CMT表示加入全局事务管理,因为使用Spring事务管理的原因,应该大多数都是用前者吧

参考:

Job Stores

Quartz配置RAMJobStore

Quartz配置JDBC-JobStoreTX

Quartz配置JDBC-JobStoreCMT

数据源配置

详细请参考,Quartz配置DataSources

一旦前面使用了JDBC的存储方式,就一定要配置数据源的,ruoyi使用的是配置注入的方式,就是将ruoyi-framework配置的数据源以注入的方式进行配置,这里不细说了,有机会写ruoyi数据源设计再讲吧

Quartz提供了数据库文件,quartz.sql下载

集群配置

参考使用JDBC-JobStore配置群集

  • org.quartz.jobStore.isClustered:设置为“true”打开集群功能

  • org.quartz.jobStore.clusterCheckinInterval:检测集群间频率,单位毫秒

  • org.quartz.jobStore.maxMisfiresToHandleAtATime:在给定的通行证中,工作区将处理的最大错误次数触发。

  • org.quartz.jobStore.txIsolationLevelSerializable:“true”表示Quartz(使用JobStoreTXCMT)在JDBC连接上调用setTransactionIsolationConnection.TRANSACTION_SERIALIZABLE)。这可以有助于防止在高负载下的某些数据库的锁定超时以及“持久”事务。

  • org.quartz.jobStore.misfireThreshold:在被认为“失火”之前,调度程序将“容忍”一个Triggers将其下一个启动时间通过的毫秒数。

  • org.quartz.jobStore.tablePrefix:表前缀

其他

ruoyi剩下的配置就不多讲了,看注释,查源码即可

Job

关于Job,前面已经说过,需要实现Job接口,实现execute方法,看看ruoyi是怎么做的吧。

/**
 * 抽象quartz调用
 *
 * @author ruoyi
 */
public abstract class AbstractQuartzJob implements Job {
    private static final Logger log = LoggerFactory.getLogger(AbstractQuartzJob.class);

    /**
     * 线程本地变量
     */
    private static ThreadLocal<Date> threadLocal = new ThreadLocal<>();

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        SysJob sysJob = new SysJob();
        BeanUtils.copyBeanProp(sysJob, context.getMergedJobDataMap().get(ScheduleConstants.TASK_PROPERTIES));
        try {
            before(context, sysJob);
            if (sysJob != null) {
                doExecute(context, sysJob);
            }
            after(context, sysJob, null);
        } catch (Exception e) {
            log.error("任务执行异常  - :", e);
            after(context, sysJob, e);
        }
    }

    /**
     * 执行前
     *
     * @param context 工作执行上下文对象
     * @param sysJob  系统计划任务
     */
    protected void before(JobExecutionContext context, SysJob sysJob) {
        threadLocal.set(new Date());
    }

    /**
     * 执行后
     *
     * @param context 工作执行上下文对象
     * @param sysJob  系统计划任务
     */
    protected void after(JobExecutionContext context, SysJob sysJob, Exception e) {
        Date startTime = threadLocal.get();
        threadLocal.remove();

        final SysJobLog sysJobLog = new SysJobLog();
        sysJobLog.setJobName(sysJob.getJobName());
        sysJobLog.setJobGroup(sysJob.getJobGroup());
        sysJobLog.setInvokeTarget(sysJob.getInvokeTarget());
        sysJobLog.setStartTime(startTime);
        sysJobLog.setStopTime(new Date());
        long runMs = sysJobLog.getStopTime().getTime() - sysJobLog.getStartTime().getTime();
        sysJobLog.setJobMessage(sysJobLog.getJobName() + " 总共耗时:" + runMs + "毫秒");
        if (e != null) {
            sysJobLog.setStatus(Constants.FAIL);
            String errorMsg = StringUtils.substring(ExceptionUtil.getExceptionMessage(e), 0, 2000);
            sysJobLog.setExceptionInfo(errorMsg);
        } else {
            sysJobLog.setStatus(Constants.SUCCESS);
        }

        // 写入数据库当中
        SpringUtils.getBean(ISysJobLogService.class).addJobLog(sysJobLog);
    }

    /**
     * 执行方法,由子类重载
     *
     * @param context 工作执行上下文对象
     * @param sysJob  系统计划任务
     * @throws Exception 执行过程中的异常
     */
    protected abstract void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception;
}

这里定义的是抽象类,抽象方法为doExecute,从@Overrideexecute可知,doExecute方法作为execute的中间部分被执行,夹在beforeafter方法之中。说到这个必须要提一下这个线程本地变量ThreadLocal,之前只是觉得面试题中见得多,现在看的代码多了,就知道了实际生产中用的真不少。这里ThreadLocal就不必过多介绍了,保存线程级变量,互不干扰。这里保存的是Date数据,在before方法中设置,在after中获取,并记录job日志,写入数据库。

抽象类看完了,接下来就是具体实现类了,这里有两个

/**
 * 定时任务处理(禁止并发执行)
 *
 * @author ruoyi
 */
@DisallowConcurrentExecution
public class QuartzDisallowConcurrentExecution extends AbstractQuartzJob {
    @Override
    protected void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception {
        JobInvokeUtil.invokeMethod(sysJob);
    }
}
/**
 * 定时任务处理(允许并发执行)
 *
 * @author ruoyi
 */
public class QuartzJobExecution extends AbstractQuartzJob {
    @Override
    protected void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception {
        JobInvokeUtil.invokeMethod(sysJob);
    }
}

从命名和有无@DisallowConcurrentExecution注解就可区分,关键是是否允许并发执行!?

可以看到它们都是调用了JobInvokeUtil工具,如下(因为代码过多,有些方法省略掉了)

/**
 * 任务执行工具
 *
 * @author ruoyi
 */
public class JobInvokeUtil {
    /**
     * 执行方法
     *
     * @param sysJob 系统任务
     */
    public static void invokeMethod(SysJob sysJob) throws Exception {
        String invokeTarget = sysJob.getInvokeTarget();
        String beanName = getBeanName(invokeTarget);
        String methodName = getMethodName(invokeTarget);
        List<Object[]> methodParams = getMethodParams(invokeTarget);

        if (!isValidClassName(beanName)) {
            Object bean = SpringUtils.getBean(beanName);
            invokeMethod(bean, methodName, methodParams);
        } else {
            Object bean = Class.forName(beanName).newInstance();
            invokeMethod(bean, methodName, methodParams);
        }
    }

    /**
     * 调用任务方法
     *
     * @param bean         目标对象
     * @param methodName   方法名称
     * @param methodParams 方法参数
     */
    private static void invokeMethod(Object bean, String methodName, List<Object[]> methodParams)
            throws NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException,
            InvocationTargetException {
        if (StringUtils.isNotNull(methodParams) && methodParams.size() > 0) {
            Method method = bean.getClass().getDeclaredMethod(methodName, getMethodParamsType(methodParams));
            method.invoke(bean, getMethodParamsValue(methodParams));
        } else {
            Method method = bean.getClass().getDeclaredMethod(methodName);
            method.invoke(bean);
        }
    }

    /**
     * 校验是否为为class包名
     *
     * @param str 名称
     * @return true是 false否
     */
    public static boolean isValidClassName(String invokeTarget) {
        return StringUtils.countMatches(invokeTarget, ".") > 1;
    }

    /**
     * 获取bean名称
     *
     * @param invokeTarget 目标字符串
     * @return bean名称
     */
    public static String getBeanName(String invokeTarget) {
        String beanName = StringUtils.substringBefore(invokeTarget, "(");
        return StringUtils.substringBeforeLast(beanName, ".");
    }

    /**
     * 获取bean方法
     *
     * @param invokeTarget 目标字符串
     * @return method方法
     */
    public static String getMethodName(String invokeTarget) {
        String methodName = StringUtils.substringBefore(invokeTarget, "(");
        return StringUtils.substringAfterLast(methodName, ".");
    }

    /**
     * 获取method方法参数相关列表
     *
     * @param invokeTarget 目标字符串
     * @return method方法相关参数列表
     */
    public static List<Object[]> getMethodParams(String invokeTarget) {
        ...
    }

    /**
     * 获取参数类型
     *
     * @param methodParams 参数相关列表
     * @return 参数类型列表
     */
    public static Class<?>[] getMethodParamsType(List<Object[]> methodParams) {
        ...
    }

    /**
     * 获取参数值
     *
     * @param methodParams 参数相关列表
     * @return 参数值列表
     */
    public static Object[] getMethodParamsValue(List<Object[]> methodParams) {
        ...
    }
}

invokeMethod(SysJob sysJob)方法来看,先从SysJob获取目标字符串(大概就是“类名.方法名(参数…)”这样的);然后分别获取类名、方法名、方法参数;在Spring容器中有的交予bean执行,没有的用反射的方式实例化一个。最后在invokeMethod(Object bean, String methodName, List<Object[]> methodParams)的方法中实现。本类中其他的方法都是辅助做验证,解析的,不在多提。

Trigger

前面也提到了,Trigger触发器,关系到Job的执行策略,关联SchedulerJob。下面便是关键的创建Scheduler代码。

/**
 * 定时任务工具类
 *
 * @author ruoyi
 */
public class ScheduleUtils {
    /**
     * 得到quartz任务类
     *
     * @param sysJob 执行计划
     * @return 具体执行任务类
     */
    private static Class<? extends Job> getQuartzJobClass(SysJob sysJob) {
        boolean isConcurrent = "0".equals(sysJob.getConcurrent());
        return isConcurrent ? QuartzJobExecution.class : QuartzDisallowConcurrentExecution.class;
    }

    /**
     * 构建任务触发对象
     */
    public static TriggerKey getTriggerKey(Long jobId, String jobGroup) {
        return TriggerKey.triggerKey(ScheduleConstants.TASK_CLASS_NAME + jobId, jobGroup);
    }

    /**
     * 构建任务键对象
     */
    public static JobKey getJobKey(Long jobId, String jobGroup) {
        return JobKey.jobKey(ScheduleConstants.TASK_CLASS_NAME + jobId, jobGroup);
    }

    /**
     * 创建定时任务
     */
    public static void createScheduleJob(Scheduler scheduler, SysJob job) throws SchedulerException, TaskException {
        Class<? extends Job> jobClass = getQuartzJobClass(job);
        // 构建job信息
        Long jobId = job.getJobId();
        String jobGroup = job.getJobGroup();
        JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(getJobKey(jobId, jobGroup)).build();

        // 表达式调度构建器
        CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
        cronScheduleBuilder = handleCronScheduleMisfirePolicy(job, cronScheduleBuilder);

        // 按新的cronExpression表达式构建一个新的trigger
        CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(getTriggerKey(jobId, jobGroup))
                .withSchedule(cronScheduleBuilder).build();

        // 放入参数,运行时的方法可以获取
        jobDetail.getJobDataMap().put(ScheduleConstants.TASK_PROPERTIES, job);

        // 判断是否存在
        if (scheduler.checkExists(getJobKey(jobId, jobGroup))) {
            // 防止创建时存在数据问题 先移除,然后在执行创建操作
            scheduler.deleteJob(getJobKey(jobId, jobGroup));
        }

        scheduler.scheduleJob(jobDetail, trigger);

        // 暂停任务
        if (job.getStatus().equals(ScheduleConstants.Status.PAUSE.getValue())) {
            scheduler.pauseJob(ScheduleUtils.getJobKey(jobId, jobGroup));
        }
    }

    /**
     * 设置定时任务策略
     */
    public static CronScheduleBuilder handleCronScheduleMisfirePolicy(SysJob job, CronScheduleBuilder cb)
            throws TaskException {
        switch (job.getMisfirePolicy()) {
            case ScheduleConstants.MISFIRE_DEFAULT:
                return cb;
            case ScheduleConstants.MISFIRE_IGNORE_MISFIRES:
                return cb.withMisfireHandlingInstructionIgnoreMisfires();
            case ScheduleConstants.MISFIRE_FIRE_AND_PROCEED:
                return cb.withMisfireHandlingInstructionFireAndProceed();
            case ScheduleConstants.MISFIRE_DO_NOTHING:
                return cb.withMisfireHandlingInstructionDoNothing();
            default:
                throw new TaskException("The task misfire policy '" + job.getMisfirePolicy()
                        + "' cannot be used in cron schedule tasks", Code.CONFIG_ERROR);
        }
    }
}

ruoyi代码写的很漂亮的,注释也非常清楚。直接从createScheduleJob方法来看,首先根据job是否支持并发确定class信息;然后根据Job创建JobDetail,用getJobKey(Long jobId, String jobGroup)区别标识;然后创建表达式构建器,ruoyi只使用cron表达式的方式(本身cron表达式就很有优势),同时根据MisfirePolicy使用不同策略;然后根据表达式构建器创建Trigger,类似于上的getTriggerKey(Long jobId, String jobGroup)方法区别标识;JobDetail放入Job对象,方便随时获取;最后就是将对应的JobDetailTrigger加入任务调度器中。

最后后,通过IDE找到调用createScheduleJob方法的地方,就三个,一个SysJobServiceImplinit方法,另外就是新增和修改Job方法中了。

至此

至此,ruoyi有关于quartz的使用几乎就讲完了,剩下的就是接口和业务设计的问题了,还有一个很重要的点,应该启动体验一下ruoyi设计这个过程,这个留到下次想起来再补充吧😂😂😂

思考

是否可以复用job,多个trigger共同调度?

就是说如果同一个job,它的执行策略比较复杂,一个cron表达式不够用,怎样能在复用这个job的情况下,将它设置多个Trigger呢?

还有。。。

Logo

快速构建 Web 应用程序

更多推荐