在上一篇博客Elastic-Job原理--简介与示例(一)中我们简单的介绍了一下Elastic-Job提供的功能,这篇博客我们通过分析Elastic-Job的源码,了解学习一下Elastic-Job的初始化、节点选举、配置变更通知等相关的流程。

     Elastic-Job依赖Zookeeper作为注册中心,利用zk的功能完成节点选举、分片和配置变更等相关的功能,接下来我们通过分析源码来了解一下Elastic-Job的节点选举机制。

节点启动注册及选举机制:

任务初始化

Elastic-Job在构造任务JobScheduler时会进行初始化后将任务名称添加到zk的命名空间中。

JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build(), jobEventConfig).init();

在init方法中会完成任务注册和节点选举操作。

public void init() {
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
        //注册任务
        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
        //添加任务信息并进行节点选举
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }

任务注册:

调用JobRegistry的registerJob方法进行任务注册

   /**
     * 添加作业调度控制器.
     * 
     * @param jobName 作业名称
     * @param jobScheduleController 作业调度控制器
     * @param regCenter 注册中心
     */
    public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
        schedulerMap.put(jobName, jobScheduleController);
        regCenterMap.put(jobName, regCenter);
        regCenter.addCacheData("/" + jobName);
    }

在registerJob方法中会调用ZookeeperRegistryCenter的addCacheData方法将任务名称作为节点名称写到zk中

    @Override
    public void addCacheData(final String cachePath) {
        TreeCache cache = new TreeCache(client, cachePath);
        try {
            //与zk建立连接并将cachePath写到zk中
            cache.start();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
        caches.put(cachePath + "/", cache);
    }

节点选举

在init初始化方法中调用如下方法进行节点选举等操作。

schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());

在registerStartUpInfo中做了如下操作:

    /**
     * 注册作业启动信息.
     * 
     * @param enabled 作业是否启用
     */
    public void registerStartUpInfo(final boolean enabled) {
        //启动所以的监听器
        listenerManager.startAllListeners();
        //节点选举
        leaderService.electLeader();
        //服务信息持久化
        serverService.persistOnline(enabled);
        //实例信息持久化
        instanceService.persistOnline();
        //重新分片
        shardingService.setReshardingFlag();
        //监控信息监听器
        monitorService.listen();
        if (!reconcileService.isRunning()) {
            reconcileService.startAsync();
        }
    }

(1)ListenerManager启动所有监听器

     /**
     * 开启所有监听器.
     */
    public void startAllListeners() {
    	//主节点选举监听管理器.
        electionListenerManager.start();
        //分片监听管理器.
        shardingListenerManager.start();
        //失效转移监听管理器.
        failoverListenerManager.start();
        //幂等性监听管理器.
        monitorExecutionListenerManager.start();
        //运行实例关闭监听管理器.
        shutdownListenerManager.start();
        //作业触发监听管理器.
        triggerListenerManager.start();
        //重调度监听管理器.
        rescheduleListenerManager.start();
        //保证分布式任务全部开始和结束状态监听管理器.保证分布式任务全部开始和结束状态监听管理器.
        guaranteeListenerManager.start();
        //注册连接状态监听器.
        jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
    }

(2)LeaderService类中节点选举

在LeaderService方法中调用electLeader方法进行节点选举,在路径中写入leader/election/latch,如果选举成功在在leader/election/instance路径中填写服务器信息。

    /**
     * 选举主节点.
     */
    public void electLeader() {
        log.debug("Elect a new leader now.");
        //leader/election/latch
        jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
        log.debug("Leader election completed.");
    }

在JobNodeStoreage中调用executeInLeader方法,使用路径leader/election/latch,如果获取这个路径则调用LeaderExecutionCallback回调函数,执行execute方法。

     /**
     * 在主节点执行操作.
     * 
     * @param latchNode 分布式锁使用的作业节点名称
     * @param callback 执行操作的回调
     */
    public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
        try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
            latch.start();
            latch.await();
            callback.execute();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            handleException(ex);
        }
    }

在LeaderExecutionCallback的execute方法中会判断是否选举为主节点,如果选举为主节点则将服务器信息添加到leader/election/instace路径中

    @RequiredArgsConstructor
    class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
        
        @Override
        public void execute() {
            if (!hasLeader()) {
                //将服务器信息添加到leader/election/instance节点中
                jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
            }
        }
    }

    /**
     * 判断是否已经有主节点.
     * 
     * @return 是否已经有主节点
     */
    public boolean hasLeader() {
        return jobNodeStorage.isJobNodeExisted(LeaderNode.INSTANCE);
    }

经过以上处理就完成了主节点选举操作。

(3)将服务信息添加到zk中

在ServerService方法中调用persistOnline将服务器信息添加到zk中,

    /**
     * 持久化作业服务器上线信息.
     * 
     * @param enabled 作业是否启用
     */
    public void persistOnline(final boolean enabled) {
        if (!JobRegistry.getInstance().isShutdown(jobName)) {
            jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? "" : ServerStatus.DISABLED.name());
        }
    }

(4)将实例信息添加到zk的instances节点中

在InstanceService中调用persistOnline方法将实例的信息初始化到zk的instances节点中

    /**
     * 持久化作业运行实例上线相关信息.
     */
    public void persistOnline() {
        jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");
    }

(5)设置节点任务重新分片

在ShardingService中调用setReshardingFlag方法,在节点sharding写创建necessary节点,通知主节点进行任务分片处理。

    /**
     * 设置需要重新分片的标记.
     */
    public void setReshardingFlag() {
        jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
    }

变更通知

Elastic-Job使用zk节点信息变化通知的机制,创建了很多监听器Listener,实现了接口TreeCacheListener创建抽象类AbstractJobListener,抽象类AbstractJobListener有很多实现类,分别进行不同的业务处理。

主要有以下实现类:

(1)CompletedNodeRemovedJobListener:保证分布式任务全部开始和结束状态监听管理器.

(2)CronSettingAndJobEventChangedJobListener:重调度监听管理器.

(3)FailoverSettingsChangedJobListener:失效转移监听管理器.

(4)InstanceShutdownStatusJobListener:运行实例关闭监听管理器.

(5)JobCrashedJobListener:失效转移监听管理器.

(6)JobTriggerStatusJobListener:作业触发监听管理器.

(7)LeaderElectionJobListener:主节点选举监听管理器.

(8)LeaderAbdicationJobListener:主节点选举监听管理器.

(9)ListenServersChangedJobListener:分片监听管理器.

(10)MonitorExecutionSettingsChangedJobListener:幂等性监听管理器.

(11)ShardingTotalCountChangedJobListener:分片监听管理器.

(12)StartedNodeRemovedJobListener:保证分布式任务全部开始和结束状态监听管理器.

 

 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐