springBoot中使用elasticjob
elasticjob执行体中最重要的两个参数是;分片总数,分片项。分片总数:集群的节点总数(类似于银行办理业务窗口的总柜台数)分片项:把每个排序的话,默认从0开始,最大值不会超过分片总数,业务受理时会拿到的业务编号(类似于你去银行柜台办理业务,你拿到的排队编号)假设我们规定排队总长度只有10个,但是启动的服务节点只有三个,那么elasticjob会给每个节点根据zookeeper实际节点数取余N%
elasticjob
执行体中最重要的两个参数是;分片总数,分片项。
分片总数:集群的节点总数(类似于银行办理业务窗口的总柜台数)
分片项:把每个排序的话,默认从0开始,最大值不会超过分片总数,业务受理时会拿到的业务编号(类似于你去银行柜台办理业务,你拿到的排队编号)
假设我们规定排队总长度只有10个,但是启动的服务节点只有三个,那么elasticjob会给每个节点根据zookeeper实际节点数取余N%3;
那么三个节点可能受理的编号情况是,
节点1包含0,1,2
节点2包含3,4,5
节点3包含6,7,8,9
这样每个节点只取它自己受理部分的编号,就不会出现重复消费和遗漏的问题了。
比如总分片数为4,两个节点,每五秒执行一次,一分钟后在启动第二个节点
他们的执行打印情况是
节点1:
分片项 ShardingItem: 0 | 运行时间: 14:18:40 | 线程ID: 58 | 分片参数: C
分片项 ShardingItem: 1 | 运行时间: 14:18:40 | 线程ID: 59 | 分片参数: D
分片项 ShardingItem: 2 | 运行时间: 14:18:40 | 线程ID: 60 | 分片参数: null
分片项 ShardingItem: 3 | 运行时间: 14:18:40 | 线程ID: 61 | 分片参数: null
分片项 ShardingItem: 0 | 运行时间: 14:20:50 | 线程ID: 72 | 分片参数: C
分片项 ShardingItem: 1 | 运行时间: 14:20:50 | 线程ID: 73 | 分片参数: D
分片项 ShardingItem: 0 | 运行时间: 14:20:55 | 线程ID: 60 | 分片参数: C
节点2
分片项 ShardingItem: 2 | 运行时间: 14:56:10 | 线程ID: 66 | 分片参数: null
分片项 ShardingItem: 3 | 运行时间: 14:56:10 | 线程ID: 67 | 分片参数: null
分片项 ShardingItem: 3 | 运行时间: 14:56:15 | 线程ID: 69 | 分片参数: null
分片项 ShardingItem: 2 | 运行时间: 14:56:15 | 线程ID: 68 | 分片参数: null
可以看出节点1一开始是全部负责,后面把2,3分给了节点2处理,这样两个节点各处理两个
总数固定的情况下,随着集群数量的增加,每个节点拥有的分片项也会有变化
@Component
public class MySimpleJob implements SimpleJob {
//@Autowired
//private OrderService orderService;//业务实现接口
@Override
public void execute(ShardingContext shardingContext) {
log.info("我是分片项:"+shardingContext.getShardingItem()+",总分片数是:"+
shardingContext.getShardingTotalCount());
}
}
springBoot中使用
生成一个springboot项目
https://start.spring.io/
pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.bamboo</groupId>
<artifactId>elastic-job-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>elastic-job-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<artifactId>elastic-job-lite-core</artifactId>
<groupId>com.dangdang</groupId>
<version>2.1.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
server.port= 8082
# zookeeper config
regCenter.serverList=localhost:2181
regCenter.namespace=demo009
# 每隔20秒执行
mailSendJob.cron=0/5 * * * * ?
# 总分片数
shardingCategory.shardingTotalCount=10
shardingCategory.shardingItemParameters=0=A,1=B,2=C,3=D,4=E,5=F,6=G,7=H,8=I,9=J
simpleJob.cron= 0/5 * * * * ?
simpleJob.shardingTotalCount= 4
simpleJob.shardingItemParameters= 0=A,1=B,0=C,1=D
job
zookeeper配置信息初始化
package com.bamboo.elasticjobdemo.config;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class ZookeeperRegistryCenterConfig {
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,
@Value("${regCenter.namespace}") final String namespace) {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}
}
job配置初始化
package com.bamboo.elasticjobdemo.config;
import com.bamboo.elasticjobdemo.job.MySimpleJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class MySimpleJobConfig {
@Resource
private ZookeeperRegistryCenter regCenter;
/**
* 自己实现的job
* */
@Bean
public SimpleJob simpleJob() {
return new MySimpleJob();
}
/**
* 将自己实现的job加入调度中执行
* */
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(final SimpleJob simpleJob,
@Value("${simpleJob.cron}") final String cron,
@Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(simpleJob, regCenter,
getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));
}
/**
* 作业的配置
* */
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron,
final int shardingTotalCount, final String shardingItemParameters) {
return LiteJobConfiguration.newBuilder(
new SimpleJobConfiguration(
JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
.shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
}
}
具体的job
package com.bamboo.elasticjobdemo.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
String str = String.format("分片总数%S\t分片项%S\t分片参数%S\tjob名称%S\t",context.getShardingTotalCount(),context.getShardingItem(),
context.getShardingParameter(),context.getJobName()
);
System.out.println(str);
switch (context.getShardingItem()){
case 0:
System.out.println(context.getShardingItem()+"参数:"+context.getShardingParameter());
break;
case 1:
System.out.println(context.getShardingItem()+"参数:"+context.getShardingParameter());
break;
case 2:
System.out.println(context.getShardingItem()+"参数:"+context.getShardingParameter());
break;
case 3:
System.out.println(context.getShardingItem()+"参数:"+context.getShardingParameter());
break;
default:
System.out.println("defalt参数:"+context.getShardingParameter());
break;
}
System.out.println(
String.format("分片项 ShardingItem: %s | 运行时间: %s | 线程ID: %s | 分片参数: %s ",
context.getShardingItem(),
new SimpleDateFormat("HH:mm:ss").format(new Date()),
Thread.currentThread().getId(),
context.getShardingParameter())
);
}
}
启动application,一分钟后修改端口号在启动一个实例
这个时候你观察两个控制台打印的信息,就会和上面讲述的一致
实际业务逻辑处理
在实际业务处理中待处理的数据比如状态为待支付=0并且支付时间pay_time<now()的数据要根据节点取余动态的分给不同的节点处理,而不能像之前的定时任务去掉%部分的筛选条件,会造成多个节点多次同时处理的情况
select * from t_order where pay_time < #{now} and status =0 and id% #{TotalCount} =#{ShardingItem}
因为同一个节点拥有的多个分片项就会有几个线程同时执行,为了每个节点的各个分配项执行线程任务时不冲突,必须加上具体的分片项值
这样业务部分就可以写出如下方式
@Override
public void execute(ShardingContext shardingContext) {
Date now = New Date();
List<Order> orderList = orderService.getExpireOrder(now,shardingContext.getShardingTotalCount(),shardingContext.getShardingItem());
if(orders !=null && orders.size()>0){
List<Long> orderIds = orderList.stream().map(Order::getId).collect(Collectors.toList())
orderService.cancelOrder(orderIds,updateTime,status,updateUser,updateNow);
}
}
警告
elasticjob已经5年没有更新了,能不用建议尽量不要用
如果一定要用elasticjob,建议单独创建一个子项目,否则当大量依赖jar时,会出现版本冲突,特别是新项目依赖新版本的spring cloud等其他基础组件时,你可能无法解决版本问题,以至于项目无法启动。
参考
【30分钟未支付订单自动取消】SpringBoot整合Elastic-Job分布式定时任务框架
https://blog.csdn.net/qq_32370913/article/details/106572649
任务调度 ElasticJob 视频教程全集
https://www.bilibili.com/video/BV1Dt411g7E6?p=2
更多推荐
所有评论(0)