概念分析

简单时间轮算法是时间轮算法的入门内容。笔者暂时研究到这块,下面做下详细的分享。

时间轮算法:是指有一条时间闭环履带,每一节代表等长时间,假想有一个指针在这条履带圆心转动,转动速度恒定。当转到某一节时候,可以从这节(假想每一节都是一个容器)取出要执行的任务,或者删除掉这节中的特定任务,或者是往该节添加新的任务。如果这个指针一直转动,那么每节中的每个任务就可以按照恒定的时间周期执行。

实际当中,这条履带的长度就是代表了所有任务中的最大任务周期,履带中的所有任务的周期都不能超过履带长度。

代码示例

下面笔者以java代码制造一个简单的时间轮示例:

声明:上述履带是一个数组,数组就是履带长度,数组每一个元素是set集合,每个set集合存放多个任务。

代码实现:

时间轮履带接口

/**
 * 时间轮接口
 */
public interface IWheel {
    /** 添加任务 */
    void addTask(ITask task);

    /** 注销轮子 */
    void destoryWheel();
}

时间轮履带

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
 * 简单时间轮
 * 简单时间轮只有一个轮子,所以任务的时间间隔不能大于轮子大小,执行任务大于轮子大小,按最大周期来执行
 */
public class SimpleWheel implements IWheel{
    Logger logger = LoggerFactory.getLogger(SimpleWheel.class);
    /** 轮子定时任务 */
    private Timer timer;
    /** 轮子 */
    private Set<ITask>[] wheel;
    /** 定时频率间隔(秒)*/
    private int interval;
    /** 轮子大小 */
    private int wheelSize;
    /** 第一次运行隔几个偏移(新加任务隔几个偏移后执行)*/
    private int firstOffset;
    /** 当前执行偏移数 */
    private int currentOffset;

    /**
    * 构造轮子函数 wheelSize:轮子大小,即最大任务周期;interval:轮子行进速度,即指针转动速度;firstOffset:新任务执行偏移量,就是每次任务添加进来后要隔多久才开始执行
    */    
    public SimpleWheel(int wheelSize, int interval, int firstOffset) throws Exception{
        if(wheelSize < 0 || interval < 0 || firstOffset < 0){
            throw new Exception("轮子大小、频率、首次间隔必须大于0");
        }
        this.wheelSize = wheelSize;
        this.interval = interval;
        this.firstOffset =firstOffset;
        this.wheel = new HashSet[wheelSize];
        for(int i=0;i<wheelSize;i++){
            this.wheel[i] = new HashSet<ITask>();
        }
        timer = new Timer();
        timer.schedule(new WheelTimerTask(), 0,interval*1000);
        logger.debug("SimpleWheel 时间轮初始化完成 wheelSize={},interval={},firstOffset={}",wheelSize,interval,firstOffset);
    }

    @Override
    public void addTask(ITask task){
        //新任务添加到时间轮的位置,通过首次偏移量加上现在指针的位置加上任务下一次执行的时间的对轮子大小进行求余
        int n = (firstOffset + currentOffset+task.getNextOffset()) % wheelSize;
        wheel[n].add(task);
        logger.debug("添加任务");
    }

    @Override
    public void destoryWheel(){
        //取消轮子
        timer.cancel();
        //因为档次的timer会继续执行,所以这样处理后,会尽可能多的中断任务。
        wheel = null;
        logger.debug("注销此轮子");
    }
    //执行任务
    private void processTask(ITask task,Long currentTime,int currentOffset){
        try {
            task.exec(task);
        }catch (Exception e){
            e.printStackTrace();
        }
        logger.debug("任务执行完成");
    }

    class WheelTimerTask extends TimerTask {
        public void run() {
            Long currentTime = System.currentTimeMillis();
            int n = currentOffset;
            currentOffset++;
            currentOffset = currentOffset % wheelSize;
            if(wheel[n].size() == 0){
                return;
            }
            Set<ITask> tasks = wheel[n];
            Iterator<ITask> it = tasks.iterator();
            while(it.hasNext()){
                processTask(it.next(),currentTime,n);
                it.remove();
            }
            logger.debug("第{}个刻度执行",n);
        }
    }
}

时间轮工具类


/**
 * 时间轮工具类
 */
public class TimingWheelUtils {
    //保证线程安全 存储所有的轮子
    private static Map<String,IWheel> wheels = new ConcurrentHashMap<>();

    public static Map<String, IWheel> getWheels() {
        return wheels;
    }

    static {
        try {
          //类初始化时候自动创建时间轮
          createSimpleWheel(MessageCenterConstant.TIMEWHEELFOR180S,181,1,5);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建一个简单时间轮
     * @param wheelKey   时间轮key
     * @param wheelSize 轮子大小
     * @param interval 定时频率间隔(秒)
     * @param firstOffset 新任务首次执行隔几个偏移
     */
    public static void createSimpleWheel(String wheelKey,int wheelSize, int interval, int firstOffset) throws Exception {
        if(wheels.get(wheelKey) != null){
            throw new Exception("此键已存在时间轮,键名="+wheelKey);
        }
        SimpleWheel simpleWheel = new SimpleWheel(wheelSize,interval,firstOffset);
        wheels.put(wheelKey,simpleWheel);
    }

    /**
     * 添加任务
     * @param wheelKey 时间轮key
     * @param task  任务
     * @return
     */
    public static boolean addTask(String wheelKey, ITask task){
        if(wheels.get(wheelKey) == null){
            return false;
        }
        IWheel wheel = wheels.get(wheelKey);
        wheel.addTask(task);
        return true;
    }

    /**
     * 移除时间轮
     * @param wheelKey
     */
    public static void destory(String wheelKey){
        if(wheels.get(wheelKey) == null){
            return;
        }
        wheels.get(wheelKey).destoryWheel();
        wheels.remove(wheelKey);
    }

}

任务接口

/**
 */
public interface ITask {
    /**
     * 下次执行时间间隔
     * @return
     */
    int getNextOffset();

    /**
     * 判断访问次数是否达到了上限
     * @return
     */
    boolean reachTop();

    /**
     * 执行任务
     */
    void exec(ITask task);

}

任务实现类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * created by 魏霖涛 on 2017/9/7 0007
 */
public class TimeWheelBean implements ITask{
    private Logger logger = LoggerFactory.getLogger("timeWheel");
    private int nextOffset;
    /**超时次数*/
    private int timeOutCount;

    public int getTimeOutCount() {
        return timeOutCount;
    }

    public void setTimeOutCount(int timeOutCount) {
        this.timeOutCount = timeOutCount;
    }

    @Override
    public int getNextOffset() {
        return nextOffset;
    }

    public void setNextOffset(int nextOffset) {
        this.nextOffset = nextOffset;
    }

    public TimeWheelBean(int nextOffset,int timeOutCount) {
        this.nextOffset = nextOffset;
        this.timeOutCount = timeOutCount;
    }

    public void decreTimeOut(){
        this.timeOutCount--;
    }

    @Override
    public boolean reachTop(){
        if(this.getTimeOutCount()<=0){
            logger.info("访问次数达到上限");
            return false;
        }
        return true;
    }
    @Override
    public void exec(ITask task) {
        //执行任务
        ...
     }
}

时间轮添加任务

/**
 * 添加任务到时间轮
 * @param timeWheelBean
 */
@Override
public void dealTimeTask(TimeWheelBean timeWheelBean){
    IWheel iWheel = TimingWheelUtils.getWheels().get(MessageCenterConstant.TIMEWHEELFOR180S);
    if(timeWheelBean.reachTop()){
        iWheel.addTask(timeWheelBean);
    }

下一篇 讲解时间轮结合redis消息队列进行任务处理和缓存



Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐