elasticjob

执行体中最重要的两个参数是;分片总数,分片项。
分片总数:集群的节点总数(类似于银行办理业务窗口的总柜台数)
分片项:把每个排序的话,默认从0开始,最大值不会超过分片总数,业务受理时会拿到的业务编号(类似于你去银行柜台办理业务,你拿到的排队编号)

假设我们规定排队总长度只有10个,但是启动的服务节点只有三个,那么elasticjob会给每个节点根据zookeeper实际节点数取余N%3;
那么三个节点可能受理的编号情况是,
节点1包含0,1,2
节点2包含3,4,5
节点3包含6,7,8,9

这样每个节点只取它自己受理部分的编号,就不会出现重复消费和遗漏的问题了。

比如总分片数为4,两个节点,每五秒执行一次,一分钟后在启动第二个节点
他们的执行打印情况是

节点1:

分片项 ShardingItem: 0 | 运行时间: 14:18:40 | 线程ID: 58 | 分片参数: C 
分片项 ShardingItem: 1 | 运行时间: 14:18:40 | 线程ID: 59 | 分片参数: D 
分片项 ShardingItem: 2 | 运行时间: 14:18:40 | 线程ID: 60 | 分片参数: null 
分片项 ShardingItem: 3 | 运行时间: 14:18:40 | 线程ID: 61 | 分片参数: null 


分片项 ShardingItem: 0 | 运行时间: 14:20:50 | 线程ID: 72 | 分片参数: C 
分片项 ShardingItem: 1 | 运行时间: 14:20:50 | 线程ID: 73 | 分片参数: D 
分片项 ShardingItem: 0 | 运行时间: 14:20:55 | 线程ID: 60 | 分片参数: C 

节点2

分片项 ShardingItem: 2 | 运行时间: 14:56:10 | 线程ID: 66 | 分片参数: null 
分片项 ShardingItem: 3 | 运行时间: 14:56:10 | 线程ID: 67 | 分片参数: null 
分片项 ShardingItem: 3 | 运行时间: 14:56:15 | 线程ID: 69 | 分片参数: null 
分片项 ShardingItem: 2 | 运行时间: 14:56:15 | 线程ID: 68 | 分片参数: null 

可以看出节点1一开始是全部负责,后面把2,3分给了节点2处理,这样两个节点各处理两个

总数固定的情况下,随着集群数量的增加,每个节点拥有的分片项也会有变化

@Component
public class MySimpleJob implements SimpleJob {
    	//@Autowired
    	//private OrderService orderService;//业务实现接口
    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("我是分片项:"+shardingContext.getShardingItem()+",总分片数是:"+
                shardingContext.getShardingTotalCount());
         
    }
} 

springBoot中使用

生成一个springboot项目
https://start.spring.io/

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.5</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.bamboo</groupId>
	<artifactId>elastic-job-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>elastic-job-demo</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>


		<dependency>
			<groupId>com.dangdang</groupId>
			<artifactId>elastic-job-lite-spring</artifactId>
			<version>2.1.5</version>
		</dependency>

		<dependency>
			<artifactId>elastic-job-lite-core</artifactId>
			<groupId>com.dangdang</groupId>
			<version>2.1.5</version>
		</dependency>


	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

application.properties

server.port= 8082


# zookeeper config
regCenter.serverList=localhost:2181
regCenter.namespace=demo009

# 每隔20秒执行
mailSendJob.cron=0/5 * * * * ?

# 总分片数
shardingCategory.shardingTotalCount=10
shardingCategory.shardingItemParameters=0=A,1=B,2=C,3=D,4=E,5=F,6=G,7=H,8=I,9=J



simpleJob.cron= 0/5 * * * * ?
simpleJob.shardingTotalCount= 4
simpleJob.shardingItemParameters= 0=A,1=B,0=C,1=D

job

zookeeper配置信息初始化

package com.bamboo.elasticjobdemo.config;

import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class    ZookeeperRegistryCenterConfig {

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,
            @Value("${regCenter.namespace}") final String namespace) {
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }

}

job配置初始化

package com.bamboo.elasticjobdemo.config;

import com.bamboo.elasticjobdemo.job.MySimpleJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

@Configuration
public class MySimpleJobConfig {

    @Resource
    private ZookeeperRegistryCenter regCenter;

    /**
     * 自己实现的job
     * */
    @Bean
    public SimpleJob simpleJob() {
        return new MySimpleJob();
    }


    /**
     * 将自己实现的job加入调度中执行
     * */
    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final SimpleJob simpleJob,
                                           @Value("${simpleJob.cron}") final String cron,
                                           @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
                                           @Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {
        return new SpringJobScheduler(simpleJob, regCenter,
                getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));

    }


    /**
     * 作业的配置
     * */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron,
                                                         final int shardingTotalCount, final String shardingItemParameters) {

        return LiteJobConfiguration.newBuilder(
                new SimpleJobConfiguration(
                        JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
                                .shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();

    }


}

具体的job

package com.bamboo.elasticjobdemo.job;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;

import java.text.SimpleDateFormat;
import java.util.Date;


public class MySimpleJob implements SimpleJob {

    @Override
    public void execute(ShardingContext context) {

        String str = String.format("分片总数%S\t分片项%S\t分片参数%S\tjob名称%S\t",context.getShardingTotalCount(),context.getShardingItem(),
                context.getShardingParameter(),context.getJobName()
        );

        System.out.println(str);

        switch (context.getShardingItem()){
            case 0:
                System.out.println(context.getShardingItem()+"参数:"+context.getShardingParameter());
                break;
            case 1:
                System.out.println(context.getShardingItem()+"参数:"+context.getShardingParameter());
                break;
            case 2:
                System.out.println(context.getShardingItem()+"参数:"+context.getShardingParameter());
                break;
            case 3:
                System.out.println(context.getShardingItem()+"参数:"+context.getShardingParameter());
                break;
            default:
                System.out.println("defalt参数:"+context.getShardingParameter());
                break;

        }



        System.out.println(
                String.format("分片项 ShardingItem: %s | 运行时间: %s | 线程ID: %s | 分片参数: %s ",
                        context.getShardingItem(),
                        new SimpleDateFormat("HH:mm:ss").format(new Date()),
                        Thread.currentThread().getId(),
                        context.getShardingParameter())
        );
    }
}

启动application,一分钟后修改端口号在启动一个实例
这个时候你观察两个控制台打印的信息,就会和上面讲述的一致

实际业务逻辑处理

在实际业务处理中待处理的数据比如状态为待支付=0并且支付时间pay_time<now()的数据要根据节点取余动态的分给不同的节点处理,而不能像之前的定时任务去掉%部分的筛选条件,会造成多个节点多次同时处理的情况

select *  from t_order where  pay_time < #{now}  and status =0   and id% #{TotalCount} =#{ShardingItem}

因为同一个节点拥有的多个分片项就会有几个线程同时执行,为了每个节点的各个分配项执行线程任务时不冲突,必须加上具体的分片项值

这样业务部分就可以写出如下方式

@Override
    public void execute(ShardingContext shardingContext) {
    
			Date now = New Date();
			List<Order> orderList = orderService.getExpireOrder(now,shardingContext.getShardingTotalCount(),shardingContext.getShardingItem());
			if(orders !=null && orders.size()>0){
			List<Long> orderIds = orderList.stream().map(Order::getId).collect(Collectors.toList())
			orderService.cancelOrder(orderIds,updateTime,status,updateUser,updateNow);
			}

}

警告

elasticjob已经5年没有更新了,能不用建议尽量不要用

如果一定要用elasticjob,建议单独创建一个子项目,否则当大量依赖jar时,会出现版本冲突,特别是新项目依赖新版本的spring cloud等其他基础组件时,你可能无法解决版本问题,以至于项目无法启动。

参考

【30分钟未支付订单自动取消】SpringBoot整合Elastic-Job分布式定时任务框架
https://blog.csdn.net/qq_32370913/article/details/106572649
任务调度 ElasticJob 视频教程全集
https://www.bilibili.com/video/BV1Dt411g7E6?p=2

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐