xxl-job源码解读:路由策略ExecutorRoute

本文基于xxl-job的2.3.1版本
基于源码介绍定时任务的路由选取策略
基本说明
路由策略在创建任务时选择,页面上的默认值为【第一个】。
当任务的执行器路由地址只有一个时,不需要进行路由判断,建议选取策略【第一个】。
当执行器地址有多个时,会根据选择的路由策略,进行执行路由选择,选取一个地址进行调度。
其中【分片广播】比较特殊,会对执行器下所有路由进行调用执行。
代码功能解读
xxl-job触发器代码主要在 com.xxl.job.admin.core.route
包下。
路由策略的实现,除了【分片广播】策略的逻辑被写在了触发器中,其他的均定义在com.xxl.job.admin.core.route.strategy
下
以下策略介绍不包含分片广播
包含的路由策略如下图:
路由策略选取
通过枚举类定义路由策略(除了分片广播)ExecutorRouteStrategyEnum
,并将策略类 router
与枚举类型进行绑定。
通过枚举类型直接能获取到对应路由策略的实现类。
这里可以看到,因为分片广播的逻辑代码放在了触发器 XxlJobTrigger
中,没有对应的实现类,所以为null。
/**
* 执行器路由选取策略
*
* @author xuxueli on 17/3/10.
*/
public enum ExecutorRouteStrategyEnum {
FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
this.title = title;
this.router = router;
}
private final String title;
/**
* 路由策略实现类
*/
private final ExecutorRouter router;
public String getTitle() {
return title;
}
public ExecutorRouter getRouter() {
return router;
}
public static ExecutorRouteStrategyEnum match(String name, ExecutorRouteStrategyEnum defaultItem) {
if (name != null) {
for (ExecutorRouteStrategyEnum item : ExecutorRouteStrategyEnum.values()) {
if (item.name().equals(name)) {
return item;
}
}
}
return defaultItem;
}
}
路由策略接口
这里使用了策略模式,策略的实现类通过实现 ExecutorRouter
抽象类,绑定枚举进行使用。
如果使用中有什么特殊的选举需求,可以通过实现ExecutorRouter
进行拓展。
/**
* 路由策略抽象类
*
* @author xuxueli on 17/3/10.
*/
public abstract class ExecutorRouter {
protected static Logger logger = LoggerFactory.getLogger(ExecutorRouter.class);
/**
* route address
*
* @param addressList 执行器的路由地址配置
* @return ReturnT.content=address
*/
public abstract ReturnT<String> route(TriggerParam triggerParam, List<String> addressList);
}
路由策略-第一个
取路由地址数组中第一个
/**
* 路由策略-第一个
*
* @author xuxueli on 17/3/10.
*/
public class ExecutorRouteFirst extends ExecutorRouter {
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
return new ReturnT<>(addressList.get(0));
}
}
路由策略-最后一个
取路由地址数组中第最后一个
/**
* 路由策略-最后一个
*
* @author xuxueli on 17/3/10.
*/
public class ExecutorRouteLast extends ExecutorRouter {
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
return new ReturnT<>(addressList.get(addressList.size() - 1));
}
}
路由策略-轮询
通过一个 ConcurrentMap<Integer, AtomicInteger>
缓存记录任务ID以及执行数字,以执行数字取模作为路由地址下标,选取地址返回。
每次调用后执行数字 +1,达到路由地址轮询调用的作用。
缓存每24小时清空一次
/**
* 路由策略-轮询
*
* @author xuxueli on 17/3/10.
*/
public class ExecutorRouteRound extends ExecutorRouter {
/**
* 轮询记录 JobId -> 调用次数递增数字(初始值为100以内的随机数, 每次调用递增, 用于取模作为路由地址数组index, 决定当次调用路由)
*/
private static ConcurrentMap<Integer, AtomicInteger> routeCountEachJob = new ConcurrentHashMap<>();
private static long CACHE_VALID_TIME = 0;
private static int count(int jobId) {
// cache clear 每24小时清空一次 轮询记录缓存
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
routeCountEachJob.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24;
}
AtomicInteger count = routeCountEachJob.get(jobId);
if (count == null || count.get() > 1000000) {
// 初始化时主动Random一次,缓解首次压力
count = new AtomicInteger(new Random().nextInt(100));
} else {
// count++ 加一并返回
count.addAndGet(1);
}
routeCountEachJob.put(jobId, count);
return count.get();
}
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = addressList.get(count(triggerParam.getJobId()) % addressList.size());
return new ReturnT<>(address);
}
}
路由策略-随机
通过 Random
进行随机数获取,决定路由下标返回地址
/**
* 路由策略-随机
* <p>随机选择在线的机器</p>
*
* @author xuxueli on 17/3/10.
*/
public class ExecutorRouteRandom extends ExecutorRouter {
private static final Random localRandom = new Random();
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = addressList.get(localRandom.nextInt(addressList.size()));
return new ReturnT<>(address);
}
}
路由策略-一致性Hash
通过一致性Hash,让每个Job固定调用其中一台机器,具体哪台机器根据hash值的范围选取。
同时通过循环,扩充hash的集合大小,以保证分组下机器分配job足够平均。
/**
* 分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;
* a、virtual node:解决不均衡问题
* b、hash method replace hashCode:String的hashCode可能重复,需要进一步扩大hashCode的取值范围
*
* @author xuxueli on 17/3/10.
*/
public class ExecutorRouteConsistentHash extends ExecutorRouter {
private static int VIRTUAL_NODE_NUM = 100;
/**
* get hash code on 2^32 ring (md5散列的方式计算hash值)
*
* @param key
* @return
*/
private static long hash(String key) {
// md5 byte
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 not supported", e);
}
md5.reset();
byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
md5.update(keyBytes);
byte[] digest = md5.digest();
// hash code, Truncate to 32-bits
long hashCode = ((long) (digest[3] & 0xFF) << 24)
| ((long) (digest[2] & 0xFF) << 16)
| ((long) (digest[1] & 0xFF) << 8)
| (digest[0] & 0xFF);
return hashCode & 0xffffffffL;
}
public String hashJob(int jobId, List<String> addressList) {
// ------A1------A2-------A3------
// -----------J1------------------
TreeMap<Long, String> addressRing = new TreeMap<>();
// 每个复制循环100次,取hash值,让map足够大,以保证分组下机器分配JOB足够平均
for (String address : addressList) {
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SHARD-" + address + "-NODE-" + i);
addressRing.put(addressHash, address);
}
}
long jobHash = hash(String.valueOf(jobId));
// 根据jobId的hash结果排序,获取出大于等于该结果的部分,存在则取这部分的第一条,不存在则选取第一个
SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
if (!lastRing.isEmpty()) {
return lastRing.get(lastRing.firstKey());
}
return addressRing.firstEntry().getValue();
}
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = hashJob(triggerParam.getJobId(), addressList);
return new ReturnT<>(address);
}
}
路由策略-最不经常使用
使用 ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap
记录每个任务ID在路由地址下的调用次数,选取调用次数最小的路由地址。
记录的缓存每24小时清空一次。
依照实现逻辑,如果运行中修改配置,给执行器添加了个路由地址,短时间内调度会全部涌入新地址中。高频任务调度的情况下需要注意下。
/**
* 路由策略-LFU(Least Frequently Used):最不经常使用,频率/次数
* <p>
* 单个JOB对应的每个执行器,使用频率最低的优先被选举
*
* @author xuxueli on 17/3/10.
*/
public class ExecutorRouteLFU extends ExecutorRouter {
/**
* JobId -> [路由地址 -> 调用次数]
*/
private static ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<>();
private static long CACHE_VALID_TIME = 0;
public String route(int jobId, List<String> addressList) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLfuMap.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24;
}
// lfu item init
HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId); // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList;
if (lfuItemMap == null) {
lfuItemMap = new HashMap<>();
jobLfuMap.putIfAbsent(jobId, lfuItemMap); // 避免重复覆盖
}
// put new
for (String address : addressList) {
// 新地址,或者已经超过一百万,使用随机数进行初始化。设置上限避免无上限的递增
if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) > 1000000) {
lfuItemMap.put(address, new Random().nextInt(addressList.size())); // 初始化时主动Random一次,缓解首次压力
}
}
// remove old
List<String> delKeys = new ArrayList<>();
for (String existKey : lfuItemMap.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
}
}
if (delKeys.size() > 0) {
for (String delKey : delKeys) {
lfuItemMap.remove(delKey);
}
}
// load least userd count address
List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<>(lfuItemMap.entrySet());
// 根据value进行排序,即根据调用次数进行正序排序,第一条则为在24小时内,调用次数最少的(不考虑随机数的差值影响)
// 排序代码可以简化为 lfuItemList.sort(Entry.comparingByValue()); -- java8
Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
return o1.getValue().compareTo(o2.getValue());
}
});
Map.Entry<String, Integer> addressItem = lfuItemList.get(0);
String minAddress = addressItem.getKey();
addressItem.setValue(addressItem.getValue() + 1);
return addressItem.getKey();
}
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = route(triggerParam.getJobId(), addressList);
return new ReturnT<>(address);
}
}
路由策略-最近最久未使用
利用LinkedHashMap
在 accessOrder = true
时,访问顺序排序的特性,进行路由选取
记录的缓存每24小时清空一次。
对于那种一天及以上才执行一次的任务,在地址配置不变的情况下,会一直调度第一个路由地址。
/**
* 路由策略-LRU(Least Recently Used):最近最久未使用,时间
* <p>
* 单个JOB对应的每个执行器,最久未使用的优先被选举
*
* @author xuxueli on 17/3/10.
*/
public class ExecutorRouteLRU extends ExecutorRouter {
/**
* JobId -> [路由地址 -> 路由地址]
* 用LinkedHashMap是为了使用accessOrder参数特性, 其中的value没有用
*/
private static ConcurrentMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<>();
private static long CACHE_VALID_TIME = 0;
public String route(int jobId, List<String> addressList) {
// cache clear 每天清空一次缓存
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLRUMap.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24;
}
// init lru
LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId);
if (lruItem == null) {
/**
* LinkedHashMap
* a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;
* b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;
*/
lruItem = new LinkedHashMap<>(16, 0.75f, true);
jobLRUMap.putIfAbsent(jobId, lruItem);
}
// 更新map里面的地址,先插入新增地址,再移除已经不在配置中的地址
// put new
for (String address : addressList) {
if (!lruItem.containsKey(address)) {
lruItem.put(address, address);
}
}
// remove old
List<String> delKeys = new ArrayList<>();
for (String existKey : lruItem.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
}
}
if (delKeys.size() > 0) {
for (String delKey : delKeys) {
lruItem.remove(delKey);
}
}
// load 获取排序的第一条数据(即最久未使用)
String eldestKey = lruItem.entrySet().iterator().next().getKey();
// 由于 accessOrder 设置为了true,会根据访问方法get重新排序: get之后元素被排序到最后,达到根据调度时间正序排序的效果
String eldestValue = lruItem.get(eldestKey);
return eldestValue;
}
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = route(triggerParam.getJobId(), addressList);
return new ReturnT<>(address);
}
}
路由策略-故障转移
按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度
/**
* 路由策略-故障转移
* <p>按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度</p>
*
* @author xuxueli on 17/3/10.
*/
public class ExecutorRouteFailover extends ExecutorRouter {
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
StringBuffer beatResultSB = new StringBuffer();
for (String address : addressList) {
// beat
ReturnT<String> beatResult;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
beatResult = executorBiz.beat();
} catch (Exception e) {
logger.error(e.getMessage(), e);
beatResult = new ReturnT<>(ReturnT.FAIL_CODE, "" + e);
}
beatResultSB.append((beatResultSB.length() > 0) ? "<br><br>" : "")
.append(I18nUtil.getString("jobconf_beat") + ":")
.append("<br>address:").append(address)
.append("<br>code:").append(beatResult.getCode())
.append("<br>msg:").append(beatResult.getMsg());
// beat success
if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
beatResult.setMsg(beatResultSB.toString());
beatResult.setContent(address);
return beatResult;
}
}
return new ReturnT<>(ReturnT.FAIL_CODE, beatResultSB.toString());
}
}
路由策略-忙碌转移
按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度
/**
* 路由策略-忙碌转移
*
* @author xuxueli on 17/3/10.
*/
public class ExecutorRouteBusyover extends ExecutorRouter {
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
StringBuffer idleBeatResultSB = new StringBuffer();
for (String address : addressList) {
// beat
ReturnT<String> idleBeatResult;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));
} catch (Exception e) {
logger.error(e.getMessage(), e);
idleBeatResult = new ReturnT<>(ReturnT.FAIL_CODE, "" + e);
}
idleBeatResultSB.append((idleBeatResultSB.length() > 0) ? "<br><br>" : "")
.append(I18nUtil.getString("jobconf_idleBeat") + ":")
.append("<br>address:").append(address)
.append("<br>code:").append(idleBeatResult.getCode())
.append("<br>msg:").append(idleBeatResult.getMsg());
// beat success
if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
idleBeatResult.setMsg(idleBeatResultSB.toString());
idleBeatResult.setContent(address);
return idleBeatResult;
}
}
return new ReturnT<>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());
}
}




更多推荐
所有评论(0)