Spring Boot结合quartz实现数据库动态启动Bean下的方法

项目代码已上传git:

https://gitee.com/gangye/springboot_quartz_schedule

或者使用csdn下载链接:

https://download.csdn.net/download/xibei19921101/12260991

相关任务以及执行结果:

Bean实例addNumWorker下定时任务方法work插入结果(上述cron表达式为每天17点4分每个20秒插入一批数据)

Bean实例下proStatisticsWorker定时任务方法work统计插入结果(上述cron表达式为每天17点5分30秒查询插入统计结果)

执行日志查看:

项目搭建过程:

项目目录简介:主要多了这些,其余的和正常的springboot的web项目一致,在service中进行了定时任务的启动

1.创建一个maven项目引入Spring Boot、Mybatis、quartz相关依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.crcb</groupId>
    <artifactId>springbootCornTask</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <log4j.version>1.7.10</log4j.version>
        <org.springframework.version>4.0.6.RELEASE</org.springframework.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.0.1</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!-- quartz spring 3.1以上才支持quartz 2.2.1 -->
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz-jobs</artifactId>
            <version>2.2.1</version>
        </dependency>
    </dependencies>

</project>

2.编写项目的配置文件,应用配置、日志配置、mybatis的一些配置

1.application.properties(注:此处在url中设置时区,解决时间插入少8小时的问题(代码中产生的时间没问题,插入时有问题)

时区的配置问题可参考:https://blog.csdn.net/u010921682/article/details/100585832

server.port=8089

#spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
#spring.jackson.time-zone=Asia/Shanghai

#数据库连接池设置
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/spring_vue_demo?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=ok

#mybatis的相关配置
mybatis.mapper-locations=classpath:mapper/*.xml
mybatis.config-location=classpath:mybatis-config.xml

2.mybatis-config.xml以及logback-config.xml参考我的另外一篇博客:

https://blog.csdn.net/xibei19921101/article/details/104717453

3.工具类Springutils、反射调用scheduleJob中定义的方法、计划任务执行处等类

1.SpringUtils.java文件(注:此处的SpringUtils工具类必须引入@Component注解,后续要使用反射,要依赖注入)

package com.crcb.utils;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.stereotype.Component;

/**
 * @Classname SpringUtils
 * @Description
 * @Date 2020/3/19 9:10
 * @Created by gangye
 */
@Component
public final class SpringUtils implements BeanFactoryPostProcessor {

    private static ConfigurableListableBeanFactory beanFactory; // Spring应用上下文环境

    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        SpringUtils.beanFactory = beanFactory;
    }

    /**
     * 获取对象
     *
     * @param name
     * @return Object 一个以所给名字注册的bean的实例
     * @throws BeansException
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException {
        return (T) beanFactory.getBean(name);
    }

    /**
     * 获取对象
     * @return Object 一个以所给名字注册的bean的实例
     * @throws BeansException
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBeanByType(Class<T> clzee) throws BeansException {
        try {
            return beanFactory.createBean(clzee);
        } catch (NoSuchBeanDefinitionException e) {
            return null;
        }

    }

    /**
     * 注入一个对象
     * @return Object 一个以所给名字注册的bean的实例
     * @throws BeansException
     */
    @SuppressWarnings("unchecked")
    public static void setBean(String springId, Object obj) throws BeansException {
        beanFactory.registerSingleton(springId, obj);
    }

    /**
     * 获取类型为requiredType的对象
     * @return
     * @throws BeansException
     */
    public static <T> T getBean(Class<T> clz) throws BeansException {
        try {
            @SuppressWarnings("unchecked")
            T result = (T) beanFactory.getBean(clz);
            return result;
        } catch (NoSuchBeanDefinitionException e) {
            return null;
        }
    }

    /**
     * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
     *
     * @param name
     * @return boolean
     */
    public static boolean containsBean(String name) {
        return beanFactory.containsBean(name);
    }

    /**
     * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。
     * 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws NoSuchBeanDefinitionException
     */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
        return beanFactory.isSingleton(name);
    }

    /**
     * @param name
     * @return Class 注册对象的类型
     * @throws NoSuchBeanDefinitionException
     */
    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
        return beanFactory.getType(name);
    }

    /**
     * 如果给定的bean名字在bean定义中有别名,则返回这些别名
     *
     * @param name
     * @return
     * @throws NoSuchBeanDefinitionException
     */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
        return beanFactory.getAliases(name);
    }

}

2.TaskUtils.java(反射调用scheduleJob中定义的方法)

package com.crcb.task;

import com.crcb.entity.ScheduleJob;
import com.crcb.utils.LogUtils;
import com.crcb.utils.Response;
import com.crcb.utils.SpringUtils;
import com.crcb.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;

import java.lang.reflect.Method;

public class TaskUtils {

    @Autowired
    private ApplicationContext applicationContext;
    /**
     * 通过反射调用scheduleJob中定义的方法
     *
     * @param scheduleJob
     */
    public static void invokMethod(ScheduleJob scheduleJob) {

        try {//添加最大的异常捕获
            String springId = scheduleJob.getSpringId();
            Object object = null;
            Class clazz = null;

            //根据反射来进行
            if (StringUtils.isNotBlank(springId)) {
                object = SpringUtils.getBean(springId);
            }

            if (object == null && StringUtils.isNotBlank(scheduleJob.getBeanClass())) {
                String jobStr = "定时任务名称 = [" + scheduleJob.getJobName() + "]-在spring 中没有这个 springId, 通过 class type 获取中...";
                LogUtils.info(jobStr, scheduleJob.getBeanClass());
                try {
                    clazz = Class.forName(scheduleJob.getBeanClass());
                    object = SpringUtils.getBean(clazz);
                    if(object == null){
                        jobStr = "定时任务名称 = [" + scheduleJob.getJobName() + "]-在spring 中没有获得 bean, 调用 spring 方法再次构建中...";
                        LogUtils.info(jobStr, scheduleJob.getBeanClass());
                        object = SpringUtils.getBeanByType(clazz);
                    }
                    if (StringUtils.isNotBlank(springId)) {
                        SpringUtils.setBean(springId, object);
                        LogUtils.info("spring bean 构建完成并加入到容器中 ", scheduleJob.getBeanClass());
                    }
                    LogUtils.info("定时任务 spring bean 构建成功! ", scheduleJob.getBeanClass());
                } catch (Exception e) {
                    LogUtils.error("定时任务 spring bean 构建失败了!!! ", scheduleJob.getBeanClass(), e);
                    Response.newResponse().error(e);
                    return;
                }
            }

            clazz = object.getClass();
            Method method = null;
            try {
                method = clazz.getDeclaredMethod(scheduleJob.getMethodName());
            } catch (NoSuchMethodException e) {
                String jobStr = "定时任务名称 = [" + scheduleJob.getJobName() + "] = 未启动成功,方法名设置错误!!!";
                LogUtils.error(jobStr, e);
            } catch (SecurityException e) {
                LogUtils.error("TaskUtils发生异常", e);
                Response.newResponse().error(e);
            }
            if (method != null) {
                try {
                    method.invoke(object);
                    LogUtils.info("定时任务名称 = [" + scheduleJob.getJobName() + "] = 启动成功");
                } catch (Exception e) {
                    Response.newResponse().error(e);
                    LogUtils.error("定时任务名称 = [" + scheduleJob.getJobName() + "] = 启动失败了!!!", e);
                    return;
                }
            } else {
                String jobStr = "定时任务名称 = [" + scheduleJob.getJobName() + "] = 启动失败了!!!";
                LogUtils.error(jobStr, clazz.getName(), "not find method ");
            }

        } catch (Exception e) {//添加最大的异常捕获
            Response.newResponse().error(e);
            LogUtils.error("定时任务名称 = [" + scheduleJob.getJobName() + "] = 启动失败了!!!", e);
        }

    }

}

3.QuartzJobFactory.java

package com.crcb.task;

import com.crcb.entity.ScheduleJob;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;


/**
 * @Description: 计划任务执行处 无状态
 */
public class QuartzJobFactory implements Job {
	public void execute(JobExecutionContext context) throws JobExecutionException {
		ScheduleJob scheduleJob = (ScheduleJob) context.getMergedJobDataMap().get("scheduleJob");
		TaskUtils.invokMethod(scheduleJob);
	}
}

4.QuartzJobFactoryDisallowConcurrentExecution.java(若一个方法一次执行不完下次轮转时则等待改方法执行完后才执行下一次操作)

package com.crcb.task;

import com.crcb.entity.ScheduleJob;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

/**
 * @Description: 若一个方法一次执行不完下次轮转时则等待改方法执行完后才执行下一次操作
 */
@DisallowConcurrentExecution
public class QuartzJobFactoryDisallowConcurrentExecution implements Job {

	public void execute(JobExecutionContext context) throws JobExecutionException {
		ScheduleJob scheduleJob = (ScheduleJob) context.getMergedJobDataMap().get("scheduleJob");
		TaskUtils.invokMethod(scheduleJob);

	}
}

4.Schedule实体类、mapper层、service层的编写

package com.crcb.entity;

import java.util.Date;

/**
 * @Classname ScheduleJob
 * @Description 任务
 * @Date 2020/3/18 18:47
 * @Created by gangye
 */
public class ScheduleJob {
    public static final String STATUS_RUNNING = "1";
    public static final String STATUS_NOT_RUNNING = "0";
    public static final String CONCURRENT_IS = "1";
    public static final String CONCURRENT_NOT = "0";

    private Long jobId;

    private Date createTime;

    private Date updateTime;
    /**
     * 任务名称
     */
    private String jobName;
    /**
     * 任务分组
     */
    private String jobGroup;
    /**
     * 任务状态 是否启动任务
     */
    private String jobStatus;
    /**
     * cron表达式
     */
    private String cronExpression;
    /**
     * 描述
     */
    private String description;
    /**
     * 任务执行时调用哪个类的方法 包名+类名
     */
    private String beanClass;
    /**
     * 任务是否有状态
     */
    private String isConcurrent;
    /**
     * spring bean
     */
    private String springId;
    /**
     * 任务调用的方法名
     */
    private String methodName;

    //此处省略getter、setter方法,不推荐使用@Data注解
}

省去mapper层介绍,由于项目代码已上传git,在service层中的实体类中引入Job

package com.crcb.service.impl;

import com.crcb.entity.ScheduleJob;
import com.crcb.mapper.SchedulerMapper;
import com.crcb.service.SchedulerService;
import com.crcb.task.QuartzJobFactory;
import com.crcb.task.QuartzJobFactoryDisallowConcurrentExecution;
import com.crcb.utils.LogUtils;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;

/**
 * @Classname SchedulerServiceImpl
 * @Description 定时任务管理
 * @Date 2020/3/18 14:10
 * @Created by gangye
 */
@Service
public class SchedulerServiceImpl implements SchedulerService {

    @Autowired
    private SchedulerFactoryBean schedulerFactoryBean;

    @Autowired
    private SchedulerMapper schedulerMapper;

    /**
     * 从数据库中取 区别于getAllJob
     *
     * @return
     */
    public List<ScheduleJob> getAllTask() {
        return schedulerMapper.getAll();
    }

    /**
     * 添加到数据库中 区别于addJob
     */
    @Transactional(propagation = Propagation.REQUIRED)
    public void addTask(ScheduleJob job) throws SchedulerException {
        job.setCreateTime(new Date());
        schedulerMapper.insert(job);
        addJob(job);
    }

    /**
     * 从数据库中查询job
     */
    public ScheduleJob getTaskById(Long jobId) {
        return schedulerMapper.selectByPrimaryKey(jobId);
    }

    /**
     * 更改任务状态
     *
     * @throws SchedulerException
     */
    @Transactional(propagation = Propagation.REQUIRED)
    public void changeStatus(Long jobId, String cmd) throws SchedulerException {
        ScheduleJob job = getTaskById(jobId);
        if (job == null) {
            return;
        }
        if ("stop".equals(cmd)) {
            deleteJob(job);
            job.setJobStatus(ScheduleJob.STATUS_NOT_RUNNING);
        } else if ("start".equals(cmd)) {
            job.setJobStatus(ScheduleJob.STATUS_RUNNING);
            addJob(job);
        }
        schedulerMapper.update(job);
    }

    /**
     * 更改任务 cron表达式
     *
     * @throws SchedulerException
     */
    @Transactional(propagation = Propagation.REQUIRED)
    public void updateCron(Long jobId, String cron) throws SchedulerException {
        ScheduleJob job = getTaskById(jobId);
        if (job == null) {
            return;
        }
        job.setCronExpression(cron);
        if (ScheduleJob.STATUS_RUNNING.equals(job.getJobStatus())) {
            updateJobCron(job);
        }
        schedulerMapper.update(job);

    }

    /**
     * 添加任务
     *
     * @param job
     * @throws SchedulerException
     */
    @Transactional(propagation = Propagation.REQUIRED)
    public void addJob(ScheduleJob job) throws SchedulerException {
        if (job == null || !ScheduleJob.STATUS_RUNNING.equals(job.getJobStatus())) {
            return;
        }

        Scheduler scheduler = schedulerFactoryBean.getScheduler();
        LogUtils.info(scheduler + ".......................................................................................add");
        TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup());

        CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);

        // 不存在,创建一个
        if (null == trigger) {
            Class clazz = ScheduleJob.CONCURRENT_IS.equals(job.getIsConcurrent()) ? QuartzJobFactory.class : QuartzJobFactoryDisallowConcurrentExecution.class;

            JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(job.getJobName(), job.getJobGroup()).build();

            jobDetail.getJobDataMap().put("scheduleJob", job);

            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());

            trigger = TriggerBuilder.newTrigger().withIdentity(job.getJobName(), job.getJobGroup()).withSchedule(scheduleBuilder).build();

            scheduler.scheduleJob(jobDetail, trigger);
        } else {
            // Trigger已存在,那么更新相应的定时设置
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());

            // 按新的cronExpression表达式重新构建trigger
            trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();

            // 按新的trigger重新设置job执行
            scheduler.rescheduleJob(triggerKey, trigger);
        }
    }

    @PostConstruct
    public void init() throws Exception {

        LogUtils.info("实例化List<ScheduleJob>,从数据库读取....",this);

        // 这里获取任务信息数据
        List<ScheduleJob> jobList = schedulerMapper.getAll();

        for (ScheduleJob job : jobList) {
            addJob(job);
        }
    }

    /**
     * 获取所有计划中的任务列表
     *
     * @return
     * @throws SchedulerException
     */
    public List<ScheduleJob> getAllJob() throws SchedulerException {
        Scheduler scheduler = schedulerFactoryBean.getScheduler();
        GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
        Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
        List<ScheduleJob> jobList = new ArrayList<ScheduleJob>();
        for (JobKey jobKey : jobKeys) {
            List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
            for (Trigger trigger : triggers) {
                ScheduleJob job = new ScheduleJob();
                job.setJobName(jobKey.getName());
                job.setJobGroup(jobKey.getGroup());
                job.setDescription("触发器:" + trigger.getKey());
                Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
                job.setJobStatus(triggerState.name());
                if (trigger instanceof CronTrigger) {
                    CronTrigger cronTrigger = (CronTrigger) trigger;
                    String cronExpression = cronTrigger.getCronExpression();
                    job.setCronExpression(cronExpression);
                }
                jobList.add(job);
            }
        }
        return jobList;
    }

    /**
     * 所有正在运行的job
     *
     * @return
     * @throws SchedulerException
     */
    public List<ScheduleJob> getRunningJob() throws SchedulerException {
        Scheduler scheduler = schedulerFactoryBean.getScheduler();
        List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
        List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(executingJobs.size());
        for (JobExecutionContext executingJob : executingJobs) {
            ScheduleJob job = new ScheduleJob();
            JobDetail jobDetail = executingJob.getJobDetail();
            JobKey jobKey = jobDetail.getKey();
            Trigger trigger = executingJob.getTrigger();
            job.setJobName(jobKey.getName());
            job.setJobGroup(jobKey.getGroup());
            job.setDescription("触发器:" + trigger.getKey());
            Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
            job.setJobStatus(triggerState.name());
            if (trigger instanceof CronTrigger) {
                CronTrigger cronTrigger = (CronTrigger) trigger;
                String cronExpression = cronTrigger.getCronExpression();
                job.setCronExpression(cronExpression);
            }
            jobList.add(job);
        }
        return jobList;
    }

    /**
     * 暂停一个job
     *
     * @param scheduleJob
     * @throws SchedulerException
     */
    public void pauseJob(ScheduleJob scheduleJob) throws SchedulerException {
        Scheduler scheduler = schedulerFactoryBean.getScheduler();
        JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
        scheduler.pauseJob(jobKey);
    }

    /**
     * 恢复一个job
     *
     * @param scheduleJob
     * @throws SchedulerException
     */
    public void resumeJob(ScheduleJob scheduleJob) throws SchedulerException {
        Scheduler scheduler = schedulerFactoryBean.getScheduler();
        JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
        scheduler.resumeJob(jobKey);
    }

    /**
     * 删除一个job
     *
     * @param scheduleJob
     * @throws SchedulerException
     */
    public void deleteJob(ScheduleJob scheduleJob) throws SchedulerException {
        Scheduler scheduler = schedulerFactoryBean.getScheduler();
        JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
        scheduler.deleteJob(jobKey);

    }

    /**
     * 立即执行job
     *
     * @param scheduleJob
     * @throws SchedulerException
     */
    public void runAJobNow(ScheduleJob scheduleJob) throws SchedulerException {
        Scheduler scheduler = schedulerFactoryBean.getScheduler();
        JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
        scheduler.triggerJob(jobKey);
    }

    /**
     * 更新job时间表达式
     *
     * @param scheduleJob
     * @throws SchedulerException
     */
    public void updateJobCron(ScheduleJob scheduleJob) throws SchedulerException {
        Scheduler scheduler = schedulerFactoryBean.getScheduler();

        TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());

        CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);

        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression());

        trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();

        scheduler.rescheduleJob(triggerKey, trigger);
    }
}

编写定时任务的逻辑,注:在数据库中配置sping_id时,spring依赖注入的时候,类名首字母小写,所以要小写

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐