elastic-job 借助zookeeper注册中心实现分布式job作业
spring scheduler 也可以实现直接的任务job,但是当我们的程序集群部署时,比如一个服务部署3个实例分别在不同的服务器上时,再使用这种单机版的任务调度器就会出现重复执行等问题,本文可使用dangdang网开源的elastic-job 框架借助zookeeper注册中心,实现分布式任务调度
一、引言
spring scheduler 也可以实现直接的任务job,但是当我们的程序集群部署时,比如一个服务部署3个实例分别在不同的服务器上时,再使用这种单机版的任务调度器就会出现重复执行等问题,本文使用dangdang网开源的elastic-job 框架借助zookeeper注册中心,实现分布式任务调度。
二、在SpringBoot项目中使用elastic-job
1)首先在pom文件中引入elastic-job的依赖jar包
<!-- 分布式任务调度 elastic-job-->
<dependency>
<artifactId>elastic-job-lite-core</artifactId>
<groupId>com.dangdang</groupId>
<version>2.1.5</version>
</dependency>
<dependency>
<artifactId>elastic-job-lite-spring</artifactId>
<groupId>com.dangdang</groupId>
<version>2.1.5</version>
</dependency>
2)然后在springboot的配置文件此处为application.yml中添加zookeeper注册中心的配置,以及job的配置,此处以闹铃提醒job为例(clockReminJob)
reg-center:
# 连接zookeeper服务器列表,多个地址用逗号分隔
server-list: 192.168.28.200:2181
# zk 的命名空间
namespace: elastic-job-lite-springboot
# 等待重试的间隔时间的初始值 默认1000,单位:毫秒
baseSleepTimeMilliseconds: 1000
# 等待重试的间隔时间的最大值 默认3000,单位:毫秒
maxSleepTimeMilliseconds: 3000
# 最大重试次数 默认3
maxRetries: 3
# 会话超时时间 默认60000,单位:毫秒
sessionTimeoutMilliseconds: 60000
# 连接超时时间 默认15000,单位:毫秒
connectionTimeoutMilliseconds: 15000
clockRemindJob:
job-name: clock
# cron表达式,用于控制作业触发时间,默认每间隔10秒钟执行一次 0 */10 * * * ?
cron: 0/10 * * * * ?
# 作业分片总数
sharding-total-count: 1
# 分片序列号和参数用等号分隔,多个键值对用逗号分隔.分片序列号从0开始,不可大于或等于作业分片总数
sharding-item-parameters: 0=全部
# 作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业
job-parameter: "my_parameter_clock"
# 是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行, 默认为false
failover: true
# 是否开启错过任务重新执行 默认为true
misfire: true
# 作业描述信息
job-description: "clock Task every ten second"
3)创建zk注册中心配置
/**
* @date 2020-6-7
* zookeeper 注册中心配置类
*/
@Configuration
@ConditionalOnExpression("'${reg-Center.serverList}'.length()>0")
public class ZkRegistryCenterConfig {
@Value("${reg-center.server-list}")
private String serverList;
@Value("${reg-center.namespace}")
private String namespace;
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(){
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList,namespace));
}
}
4)自定义job作业实现SimpleJob接口,实现execute(...)方法,方法里面正是我们执行的内容
@Component
@Slf4j
public class ClockRemindJob implements SimpleJob {
public static int count = 1;
@Override
public void execute(ShardingContext shardingContext){
String jobParameter = shardingContext.getJobParameter();
String time = DateUtil.formatDate(new Date());
System.out.println("当前时间是:"+time+" ,job执行的总次数为:"+count++);
}
}
5)创建一个自定义的类,生成 LiteJobConfiguration,并从配置文件中读入配置参数(当然也可以通过其他方式传入参数),最后生成一个job配置类
@Configuration
@Data
public class ClockRemindJobConfig {
@Value("${clockRemindJob.cron}")
private String cron;
@Value("${clockRemindJob.sharding-total-count}")
private int shardingTotalCount;
@Value("${clockRemindJob.sharding-item-parameters}")
private String shardingItemParameters;
@Value("${clockRemindJob.job-description}")
private String jobDescription;
@Value("${clockRemindJob.job-parameter}")
private String jobParameter;
@Value("${clockRemindJob.job-name}")
private String jobName;
@Autowired
private ClockRemindJob clockRemindJob;
public LiteJobConfiguration liteJobConfiguration(){
JobCoreConfiguration.Builder builder =
JobCoreConfiguration.newBuilder(clockRemindJob.getClass().getName(),cron,shardingTotalCount);
JobCoreConfiguration jobCoreConfiguration = builder
.shardingItemParameters(shardingItemParameters)
.description(jobDescription)
.jobParameter(jobParameter)
.build();
SimpleJobConfiguration simpleJobConfiguration = new
SimpleJobConfiguration(jobCoreConfiguration,clockRemindJob.getClass().getCanonicalName());
return LiteJobConfiguration
.newBuilder(simpleJobConfiguration)
.overwrite(true)
.build();
}
}
6)最后再创建一个SpringScheduler(...)将这些配置联合起来以启动job
/**
* job 的配置,在此类中启动job
*/
@Configuration
public class JobConfig {
@Autowired
private ZookeeperRegistryCenter regCenter;
@Autowired
private ClockRemindJob alarmRemindJob;
@Autowired
private ClockRemindJobConfig clockRemindJobConfig;
// 容器启动的时候初始化该实例化bean
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(){
// 把这三个配置组合起来
new SpringJobScheduler(alarmRemindJob,regCenter,clockRemindJobConfig.liteJobConfiguration()).init();
return null;
}
}
7)启动程序运行,控制台输出:每隔10秒钟打印一次,这和我们配置文件中配置的job执行频率(cron: 0/10 * * * * ?
)是一致的,
当前时间是:2020-06-07 22:16:10 ,job执行的总次数为:1
当前时间是:2020-06-07 22:16:20 ,job执行的总次数为:2
当前时间是:2020-06-07 22:16:30 ,job执行的总次数为:3
当前时间是:2020-06-07 22:16:40 ,job执行的总次数为:4
当前时间是:2020-06-07 22:16:50 ,job执行的总次数为:5
当前时间是:2020-06-07 22:17:00 ,job执行的总次数为:6
三、小结
elastic-job实现分布式job作业,可以满足我们在项目中任务调度、定时任务等的需求。有时在项目中,我们需要动态的根据相应条件动态的创建job,或者根据参数动态的执行相应的任务,后续继续探讨动态创建job以及job中的参数传递......
更多推荐
所有评论(0)