本文基于xxl-job的2.3.1版本

基于源码介绍定时任务的路由选取策略

基本说明

路由策略在创建任务时选择,页面上的默认值为【第一个】。

当任务的执行器路由地址只有一个时,不需要进行路由判断,建议选取策略【第一个】。

当执行器地址有多个时,会根据选择的路由策略,进行执行路由选择,选取一个地址进行调度。

其中【分片广播】比较特殊,会对执行器下所有路由进行调用执行。

代码功能解读

xxl-job触发器代码主要在 com.xxl.job.admin.core.route 包下。

路由策略的实现,除了【分片广播】策略的逻辑被写在了触发器中,其他的均定义在com.xxl.job.admin.core.route.strategy

以下策略介绍不包含分片广播

包含的路由策略如下图:

在这里插入图片描述

路由策略选取

通过枚举类定义路由策略(除了分片广播)ExecutorRouteStrategyEnum,并将策略类 router与枚举类型进行绑定。

通过枚举类型直接能获取到对应路由策略的实现类。

这里可以看到,因为分片广播的逻辑代码放在了触发器 XxlJobTrigger 中,没有对应的实现类,所以为null。

/**
 * 执行器路由选取策略
 *
 * @author xuxueli on 17/3/10.
 */
public enum ExecutorRouteStrategyEnum {

    FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
    LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
    ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
    RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
    CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
    LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
    LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
    FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
    BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
    SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);

    ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
        this.title = title;
        this.router = router;
    }

    private final String title;

    /**
     * 路由策略实现类
     */
    private final ExecutorRouter router;

    public String getTitle() {
        return title;
    }

    public ExecutorRouter getRouter() {
        return router;
    }

    public static ExecutorRouteStrategyEnum match(String name, ExecutorRouteStrategyEnum defaultItem) {
        if (name != null) {
            for (ExecutorRouteStrategyEnum item : ExecutorRouteStrategyEnum.values()) {
                if (item.name().equals(name)) {
                    return item;
                }
            }
        }
        return defaultItem;
    }

}

路由策略接口

这里使用了策略模式,策略的实现类通过实现 ExecutorRouter 抽象类,绑定枚举进行使用。

如果使用中有什么特殊的选举需求,可以通过实现ExecutorRouter 进行拓展。

/**
 * 路由策略抽象类
 *
 * @author xuxueli on 17/3/10.
 */
public abstract class ExecutorRouter {
    protected static Logger logger = LoggerFactory.getLogger(ExecutorRouter.class);

    /**
     * route address
     *
     * @param addressList 执行器的路由地址配置
     * @return ReturnT.content=address
     */
    public abstract ReturnT<String> route(TriggerParam triggerParam, List<String> addressList);

}

路由策略-第一个

取路由地址数组中第一个

/**
 * 路由策略-第一个
 *
 * @author xuxueli on 17/3/10.
 */
public class ExecutorRouteFirst extends ExecutorRouter {

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        return new ReturnT<>(addressList.get(0));
    }

}

路由策略-最后一个

取路由地址数组中第最后一个

/**
 * 路由策略-最后一个
 *
 * @author xuxueli on 17/3/10.
 */
public class ExecutorRouteLast extends ExecutorRouter {

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        return new ReturnT<>(addressList.get(addressList.size() - 1));
    }

}

路由策略-轮询

xxl-job
xxl-job: 是一个分布式任务调度平台,核心设计目标是开发迅速、学习简单、轻量级、易扩展。

通过一个 ConcurrentMap<Integer, AtomicInteger> 缓存记录任务ID以及执行数字,以执行数字取模作为路由地址下标,选取地址返回。

每次调用后执行数字 +1,达到路由地址轮询调用的作用。

缓存每24小时清空一次

/**
 * 路由策略-轮询
 *
 * @author xuxueli on 17/3/10.
 */
public class ExecutorRouteRound extends ExecutorRouter {

    /**
     * 轮询记录 JobId -> 调用次数递增数字(初始值为100以内的随机数, 每次调用递增, 用于取模作为路由地址数组index, 决定当次调用路由)
     */
    private static ConcurrentMap<Integer, AtomicInteger> routeCountEachJob = new ConcurrentHashMap<>();
    private static long CACHE_VALID_TIME = 0;

    private static int count(int jobId) {
        // cache clear 每24小时清空一次 轮询记录缓存
        if (System.currentTimeMillis() > CACHE_VALID_TIME) {
            routeCountEachJob.clear();
            CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24;
        }

        AtomicInteger count = routeCountEachJob.get(jobId);
        if (count == null || count.get() > 1000000) {
            // 初始化时主动Random一次,缓解首次压力
            count = new AtomicInteger(new Random().nextInt(100));
        } else {
            // count++ 加一并返回
            count.addAndGet(1);
        }
        routeCountEachJob.put(jobId, count);
        return count.get();
    }

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = addressList.get(count(triggerParam.getJobId()) % addressList.size());
        return new ReturnT<>(address);
    }

}

路由策略-随机

通过 Random 进行随机数获取,决定路由下标返回地址

/**
 * 路由策略-随机
 * <p>随机选择在线的机器</p>
 *
 * @author xuxueli on 17/3/10.
 */
public class ExecutorRouteRandom extends ExecutorRouter {

    private static final Random localRandom = new Random();

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = addressList.get(localRandom.nextInt(addressList.size()));
        return new ReturnT<>(address);
    }

}

路由策略-一致性Hash

通过一致性Hash,让每个Job固定调用其中一台机器,具体哪台机器根据hash值的范围选取。

同时通过循环,扩充hash的集合大小,以保证分组下机器分配job足够平均。

/**
 * 分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;
 * a、virtual node:解决不均衡问题
 * b、hash method replace hashCode:String的hashCode可能重复,需要进一步扩大hashCode的取值范围
 *
 * @author xuxueli on 17/3/10.
 */
public class ExecutorRouteConsistentHash extends ExecutorRouter {

    private static int VIRTUAL_NODE_NUM = 100;

    /**
     * get hash code on 2^32 ring (md5散列的方式计算hash值)
     *
     * @param key
     * @return
     */
    private static long hash(String key) {

        // md5 byte
        MessageDigest md5;
        try {
            md5 = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("MD5 not supported", e);
        }
        md5.reset();
        byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);

        md5.update(keyBytes);
        byte[] digest = md5.digest();

        // hash code, Truncate to 32-bits
        long hashCode = ((long) (digest[3] & 0xFF) << 24)
                | ((long) (digest[2] & 0xFF) << 16)
                | ((long) (digest[1] & 0xFF) << 8)
                | (digest[0] & 0xFF);

        return hashCode & 0xffffffffL;
    }

    public String hashJob(int jobId, List<String> addressList) {

        // ------A1------A2-------A3------
        // -----------J1------------------
        TreeMap<Long, String> addressRing = new TreeMap<>();
        // 每个复制循环100次,取hash值,让map足够大,以保证分组下机器分配JOB足够平均
        for (String address : addressList) {
            for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
                long addressHash = hash("SHARD-" + address + "-NODE-" + i);
                addressRing.put(addressHash, address);
            }
        }

        long jobHash = hash(String.valueOf(jobId));
        // 根据jobId的hash结果排序,获取出大于等于该结果的部分,存在则取这部分的第一条,不存在则选取第一个
        SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
        if (!lastRing.isEmpty()) {
            return lastRing.get(lastRing.firstKey());
        }
        return addressRing.firstEntry().getValue();
    }

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = hashJob(triggerParam.getJobId(), addressList);
        return new ReturnT<>(address);
    }

}

路由策略-最不经常使用

使用 ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap 记录每个任务ID在路由地址下的调用次数,选取调用次数最小的路由地址。

记录的缓存每24小时清空一次。

依照实现逻辑,如果运行中修改配置,给执行器添加了个路由地址,短时间内调度会全部涌入新地址中。高频任务调度的情况下需要注意下。

/**
 * 路由策略-LFU(Least Frequently Used):最不经常使用,频率/次数
 * <p>
 * 单个JOB对应的每个执行器,使用频率最低的优先被选举
 *
 * @author xuxueli on 17/3/10.
 */
public class ExecutorRouteLFU extends ExecutorRouter {

    /**
     * JobId -> [路由地址 -> 调用次数]
     */
    private static ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<>();
    private static long CACHE_VALID_TIME = 0;

    public String route(int jobId, List<String> addressList) {

        // cache clear
        if (System.currentTimeMillis() > CACHE_VALID_TIME) {
            jobLfuMap.clear();
            CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24;
        }

        // lfu item init
        HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId);     // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList;
        if (lfuItemMap == null) {
            lfuItemMap = new HashMap<>();
            jobLfuMap.putIfAbsent(jobId, lfuItemMap);   // 避免重复覆盖
        }

        // put new
        for (String address : addressList) {
            // 新地址,或者已经超过一百万,使用随机数进行初始化。设置上限避免无上限的递增
            if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) > 1000000) {
                lfuItemMap.put(address, new Random().nextInt(addressList.size()));  // 初始化时主动Random一次,缓解首次压力
            }
        }
        // remove old
        List<String> delKeys = new ArrayList<>();
        for (String existKey : lfuItemMap.keySet()) {
            if (!addressList.contains(existKey)) {
                delKeys.add(existKey);
            }
        }
        if (delKeys.size() > 0) {
            for (String delKey : delKeys) {
                lfuItemMap.remove(delKey);
            }
        }

        // load least userd count address
        List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<>(lfuItemMap.entrySet());
        // 根据value进行排序,即根据调用次数进行正序排序,第一条则为在24小时内,调用次数最少的(不考虑随机数的差值影响)
        // 排序代码可以简化为 lfuItemList.sort(Entry.comparingByValue()); -- java8
        Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() {
            @Override
            public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
                return o1.getValue().compareTo(o2.getValue());
            }
        });

        Map.Entry<String, Integer> addressItem = lfuItemList.get(0);
        String minAddress = addressItem.getKey();
        addressItem.setValue(addressItem.getValue() + 1);

        return addressItem.getKey();
    }

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = route(triggerParam.getJobId(), addressList);
        return new ReturnT<>(address);
    }

}

路由策略-最近最久未使用

利用LinkedHashMapaccessOrder = true 时,访问顺序排序的特性,进行路由选取

记录的缓存每24小时清空一次。

对于那种一天及以上才执行一次的任务,在地址配置不变的情况下,会一直调度第一个路由地址。

/**
 * 路由策略-LRU(Least Recently Used):最近最久未使用,时间
 * <p>
 * 单个JOB对应的每个执行器,最久未使用的优先被选举
 *
 * @author xuxueli on 17/3/10.
 */
public class ExecutorRouteLRU extends ExecutorRouter {

    /**
     * JobId -> [路由地址 -> 路由地址]
     * 用LinkedHashMap是为了使用accessOrder参数特性, 其中的value没有用
     */
    private static ConcurrentMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<>();
    private static long CACHE_VALID_TIME = 0;

    public String route(int jobId, List<String> addressList) {

        // cache clear 每天清空一次缓存
        if (System.currentTimeMillis() > CACHE_VALID_TIME) {
            jobLRUMap.clear();
            CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24;
        }

        // init lru
        LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId);
        if (lruItem == null) {
            /**
             * LinkedHashMap
             *      a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;
             *      b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;
             */
            lruItem = new LinkedHashMap<>(16, 0.75f, true);
            jobLRUMap.putIfAbsent(jobId, lruItem);
        }

        // 更新map里面的地址,先插入新增地址,再移除已经不在配置中的地址
        // put new
        for (String address : addressList) {
            if (!lruItem.containsKey(address)) {
                lruItem.put(address, address);
            }
        }
        // remove old
        List<String> delKeys = new ArrayList<>();
        for (String existKey : lruItem.keySet()) {
            if (!addressList.contains(existKey)) {
                delKeys.add(existKey);
            }
        }
        if (delKeys.size() > 0) {
            for (String delKey : delKeys) {
                lruItem.remove(delKey);
            }
        }

        // load 获取排序的第一条数据(即最久未使用)
        String eldestKey = lruItem.entrySet().iterator().next().getKey();
        // 由于 accessOrder 设置为了true,会根据访问方法get重新排序: get之后元素被排序到最后,达到根据调度时间正序排序的效果
        String eldestValue = lruItem.get(eldestKey);
        return eldestValue;
    }

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = route(triggerParam.getJobId(), addressList);
        return new ReturnT<>(address);
    }

}

路由策略-故障转移

按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度

/**
 * 路由策略-故障转移
 * <p>按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度</p>
 *
 * @author xuxueli on 17/3/10.
 */
public class ExecutorRouteFailover extends ExecutorRouter {

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {

        StringBuffer beatResultSB = new StringBuffer();
        for (String address : addressList) {
            // beat
            ReturnT<String> beatResult;
            try {
                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
                beatResult = executorBiz.beat();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                beatResult = new ReturnT<>(ReturnT.FAIL_CODE, "" + e);
            }
            beatResultSB.append((beatResultSB.length() > 0) ? "<br><br>" : "")
                    .append(I18nUtil.getString("jobconf_beat") + ":")
                    .append("<br>address:").append(address)
                    .append("<br>code:").append(beatResult.getCode())
                    .append("<br>msg:").append(beatResult.getMsg());

            // beat success
            if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {

                beatResult.setMsg(beatResultSB.toString());
                beatResult.setContent(address);
                return beatResult;
            }
        }
        return new ReturnT<>(ReturnT.FAIL_CODE, beatResultSB.toString());

    }
}

路由策略-忙碌转移

按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度

/**
 * 路由策略-忙碌转移
 *
 * @author xuxueli on 17/3/10.
 */
public class ExecutorRouteBusyover extends ExecutorRouter {

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        StringBuffer idleBeatResultSB = new StringBuffer();
        for (String address : addressList) {
            // beat
            ReturnT<String> idleBeatResult;
            try {
                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
                idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                idleBeatResult = new ReturnT<>(ReturnT.FAIL_CODE, "" + e);
            }
            idleBeatResultSB.append((idleBeatResultSB.length() > 0) ? "<br><br>" : "")
                    .append(I18nUtil.getString("jobconf_idleBeat") + ":")
                    .append("<br>address:").append(address)
                    .append("<br>code:").append(idleBeatResult.getCode())
                    .append("<br>msg:").append(idleBeatResult.getMsg());

            // beat success
            if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
                idleBeatResult.setMsg(idleBeatResultSB.toString());
                idleBeatResult.setContent(address);
                return idleBeatResult;
            }
        }

        return new ReturnT<>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());
    }

}
阅读全文
AI总结
GitHub 加速计划 / xx / xxl-job
29
10
下载
xxl-job: 是一个分布式任务调度平台,核心设计目标是开发迅速、学习简单、轻量级、易扩展。
最近提交(Master分支:5 天前 )
07b58ac5 - 2 天前
4469c5d6 - 新增 AI 执行器示例,与 spring-ai 集成打通 - 内置一系列 AI 类任务模板,支持快速开发 - 新增通用 OllamaChat任务,支持自定义 prompt 和 input - 更新文档和数据库初始化脚本,增加 AI 执行器相关说明和示例数据 - 9 天前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐