Elastic-job-分布式定时任务框架简介和使用
一.简介Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供最轻量级的分布式任务的协调服务,外部依赖仅Zookeeper。二.基础概念1. 分片概念任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业...
一.简介
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供最轻量级的分布式任务的协调服务,外部依赖仅Zookeeper。
二.基础概念
1. 分片概念
任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。
例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直接的结果就是服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据。
2. 分片项与业务处理解耦
Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。
3. 个性化参数的适用场景
个性化参数即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。
例如:按照地区水平拆分数据库,数据库A是北京的数据;数据库B是上海的数据;数据库C是广州的数据。 如果仅按照分片项配置,开发者需要了解0表示北京;1表示上海;2表示广州。 合理使用个性化参数可以让代码更可读,如果配置为0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。
三.核心概念
1. 分布式调度
Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。
注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。
2. 作业高可用
Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。
一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。
3. 最大限度利用资源
Elastic-Job-Lite也提供最灵活的方式,最大限度的提高执行作业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。
例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。
四.整体架构
四.实现原理
弹性分布式实现
-
第一台服务器上线触发主服务器选举。主服务器一旦下线,则重新触发选举,选举过程中阻塞,只有主服务器选举完成,才会执行其他任务。
-
某作业服务器上线时会自动将服务器信息注册到注册中心,下线时会自动更新服务器状态。
-
主节点选举,服务器上下线,分片总数变更均更新重新分片标记。
-
定时任务触发时,如需重新分片,则通过主服务器分片,分片过程中阻塞,分片结束后才可执行任务。如分片过程中主服务器下线,则先选举主服务器,再分片。
-
通过上一项说明可知,为了维持作业运行时的稳定性,运行过程中只会标记分片状态,不会重新分片。分片仅可能发生在下次任务触发前。
-
每次分片都会按服务器IP排序,保证分片结果不会产生较大波动。
-
实现失效转移功能,在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。
注册中心数据结构
注册中心在定义的命名空间下,创建作业名称节点,用于区分不同作业,所以作业一旦创建则不能修改作业名称,如果修改名称将视为新的作业。作业名称节点下又包含4个数据子节点,分别是config, instances, sharding, servers和leader。
config节点
作业配置信息,以JSON格式存储
instances节点
作业运行实例信息,子节点是当前作业运行实例的主键。作业运行实例主键由作业运行服务器的IP地址和PID构成。作业运行实例主键均为临时节点,当作业实例上线时注册,下线时自动清理。注册中心监控这些节点的变化来协调分布式作业的分片以及高可用。 可在作业运行实例节点写入TRIGGER表示该实例立即执行一次。
sharding节点
作业分片信息,子节点是分片项序号,从零开始,至分片总数减一。分片项序号的子节点存储详细信息。每个分片项下的子节点用于控制和记录分片运行状态。节点详细信息说明:
子节点名 | 临时节点 | 描述 |
---|---|---|
instance | 否 | 执行该分片项的作业运行实例主键 |
running | 是 | 分片项正在运行的状态 仅配置monitorExecution时有效 |
failover | 是 | 如果该分片项被失效转移分配给其他作业服务器,则此节点值记录执行此分片的作业服务器IP |
misfire | 否 | 是否开启错过任务重新执行 |
disabled | 否 | 是否禁用此分片项 |
servers节点
作业服务器信息,子节点是作业服务器的IP地址。可在IP地址节点写入DISABLED表示该服务器禁用。 在新的cloud native架构下,servers节点大幅弱化,仅包含控制服务器是否可以禁用这一功能。为了更加纯粹的实现job核心,servers功能未来可能删除,控制服务器是否禁用的能力应该下放至自动化部署系统。
leader节点
作业服务器主节点信息,分为election,sharding和failover三个子节点。分别用于主节点选举,分片和失效转移处理。
leader节点是内部使用的节点,如果对作业框架原理不感兴趣,可不关注此节点。
子节点名 | 临时节点 | 描述 |
---|---|---|
election\instance | 是 | 主节点服务器IP地址 一旦该节点被删除将会触发重新选举 重新选举的过程中一切主节点相关的操作都将阻塞 |
election\latch | 否 | 主节点选举的分布式锁 为curator的分布式锁使用 |
sharding\necessary | 否 | 是否需要重新分片的标记 如果分片总数变化,或作业服务器节点上下线或启用/禁用,以及主节点选举,会触发设置重分片标记 作业在下次执行时使用主节点重新分片,且中间不会被打断 作业执行时不会触发分片 |
sharding\processing | 是 | 主节点在分片时持有的节点 如果有此节点,所有的作业执行都将阻塞,直至分片结束 主节点分片结束或主节点崩溃会删除此临时节点 |
failover\items\分片项 | 否 | 一旦有作业崩溃,则会向此节点记录 当有空闲作业服务器时,会从此节点抓取需失效转移的作业项 |
failover\items\latch | 否 | 分配失效转移分片项时占用的分布式锁 为curator的分布式锁使用 |
流程图
作业启动
作业执行
五.spring方式使用
1.添加Maven依赖
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-common-core</artifactId>
<version>2.1.5</version>
</dependency>
2.spring-quartz.xml文件配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.2.xsd
">
<!--引入配置属性文件 ignore-unresolvable解析$-->
<context:property-placeholder location="classpath:config.properties" ignore-unresolvable="true"/>
<!--配置作业注册中心 -->
<reg:zookeeper id="regCenter" server-lists="${server.lists}"
namespace="light-cloud-quartz" base-sleep-time-milliseconds="1000"
max-sleep-time-milliseconds="3000" max-retries="3"/>
<!-- 配置作业-->
<job:simple id="demoSimpleSpringJob" class="com.lightcloud.quartz.MyElasticJob" registry-center-ref="regCenter"
cron="0/5 * * * * ?" sharding-total-count="2" sharding-item-parameters="0=A,1=B" failover="true"
overwrite="true" monitor-port="9888"/>
</beans>
3.作业java类
package com.lightcloud.quartz;
import com.alibaba.dubbo.config.annotation.Reference;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.lightcloud.commodity.api.CommodityApi;
import lombok.extern.slf4j.Slf4j;
/**
* 定时任务测试
* @author
* @date 2018-07-24
*/
@Slf4j
public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
String shardingParameter = shardingContext.getShardingParameter();
switch (shardingContext.getShardingItem()) {
case 0:
// do something by sharding item
log.info("===========定时任务0==========="+shardingParameter);
default:
log.info("===========default==========="+shardingParameter);
break;
}
}
}
六.其他
其他具体详细配置请查看官网,地址为:http://elasticjob.io/docs/elastic-job-lite/00-overview/
更多推荐
所有评论(0)