ElasticJob分布式调度,使用注册中心zookeeper开启固态定时任务附源码(二)

问题背景

上一篇介绍了ElasticJob分布式调度基本概念,这个篇章介绍一下分布式固态定时任务
注意事项:

项目搭建

1 引入pom依赖,可以选择com.github.kuhn-he的,这个包含所有依赖,或者选择com.dangdang

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.yg</groupId>
    <artifactId>dangdang-elasticjob</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>dangdang-elasticjob</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- ElasticJobAutoConfiguration自动配置类作用-->
        <dependency>
            <groupId>com.github.kuhn-he</groupId>
            <artifactId>elastic-job-lite-spring-boot-starter</artifactId>
            <version>2.1.5</version>
        </dependency>
        <!-- elastic-job -->
<!--        <dependency>-->
<!--            <groupId>com.dangdang</groupId>-->
<!--            <artifactId>elastic-job-lite-core</artifactId>-->
<!--            <version>2.1.5</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>com.dangdang</groupId>-->
<!--            <artifactId>elastic-job-lite-spring</artifactId>-->
<!--            <version>2.1.5</version>-->
<!--        </dependency>-->
        <!-- elastic-job  end -->
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.6.0</version>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

2 application.properties,添加端口号,zookeeper地址,因为我的zookeeper是一个单机模式,所以只有一个

server.port=1995

elaticjob.zookeeper.server-lists=10.10.195.193:2181
elaticjob.zookeeper.namespace=my-project

3 ElasticJob配置,主要进行加载和初始化

package com.yg.dangdangelasticjob.config;


import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

/**
 * @Author suolong
 * @Date 2022/4/14 16:03
 * @Version 1.5
 */

@Configuration
public class ElasticJobConfig {

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${elaticjob.zookeeper.server-lists}") final String serverList, @Value("${elaticjob.zookeeper.namespace}") final String namespace) {
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }

    @Autowired
    private ZookeeperRegistryCenter regCenter;

    /**
     * 动态添加
     * @param jobClass
     * @param cron
     * @param shardingTotalCount
     * @param shardingItemParameters
     */
    public void addSimpleJobScheduler(final Class<? extends SimpleJob> jobClass,
                                      final String cron,
                                      final int shardingTotalCount,
                                      final String shardingItemParameters){
        JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).jobParameter("job参数").build();
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, jobClass.getCanonicalName());
        JobScheduler jobScheduler = new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build());
        jobScheduler.init();

    }
}

4 实现ElasticJob的SimpleJob接口,

package com.yg.dangdangelasticjob.elasticjob;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.elasticjob.lite.annotation.ElasticSimpleJob;
import org.springframework.stereotype.Component;

/**
 * @Author suolong
 * @Date 2022/4/14 21:40
 * @Version 1.5
 */

@ElasticSimpleJob(cron = "0/3 * * * * ?", jobName = "test123", shardingTotalCount = 2, jobParameter = "测试参数", shardingItemParameters = "0=A,1=B")
@Component
public class ScanDataxAsync implements com.dangdang.ddframe.job.api.simple.SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, " +
                        "当前分片项: %s.当前参数: %s," +
                        "当前任务名称: %s.当前任务参数: %s"
                ,
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()
        ));
    }
}
//每三秒执行一次,任务名称test123,开启两个分片,任务参数,为一个分片名为A,第二个分片名为B
@ElasticSimpleJob(cron = "0/3 * * * * ?", jobName = "test123", shardingTotalCount = 2, jobParameter = "测试参数", shardingItemParameters = "0=A,1=B")

5 项目启动类

package com.yg.dangdangelasticjob;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DangdangElasticjobApplication {

    public static void main(String[] args) {
        SpringApplication.run(DangdangElasticjobApplication.class, args);
    }

}

6 整体项目文件目录

代码测试

1 启动项目,可以看见有两个分片在工作

总结

  • 最近项目的需求,分布式集群部署跑起来了




作为程序员第 111 篇文章,每次写一句歌词记录一下,看看人生有几首歌的时间,wahahaha …

Lyric: 是我给的承诺

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐