仿kafka实现java版时间轮
仿kafka实现java版时间轮
系统定时、超时
在我们平时的项目开发中,会设置系统的超时时间,比如在http接口中设置超时时间,在定时调度中也会用到。在jdk的开发的实现Timer和ScheduledThreadPoolExecutor、DelayQueue定时调度中使用的是最小堆,我们知道最小堆的插入时间复杂度是log(n)。在kafka中,采用的是基于O(1)的时间轮算法,本节我们就使用java来模仿kafka层级时间轮。
时间轮简介
时间轮的实现思想是借鉴我们的钟表,秒针转动一圈,分钟移动一个,分钟转动一格,始终移动一格,在kafka中称为桶bucket。下面文章中称为槽。
在kafka中第一个槽默认一格表示1ms,第一个时间轮是20个槽,所以第一一个时间轮代表20ms。第二个时间轮的每一格式第一个时间轮的总时间,也就是20ms,所以第二个时间轮可表示的时间范围是400ms,依次类推,第三个时间轮可表示的时间范围是8s,第四个时间轮是160s等等。由于时间在向前推进,故一段时间后,第二个时间轮上的任务会向转移到第一个时间轮上,这样递进的方式,最终任务都会执行。
kafka中的每个槽表示一个TimerTaskList,每个任务加到这个TimerTaskList上,如下图中时间轮中每个槽都代表一个TimerTaskList。
1.任务TimerTask源码分析
TimerTask类表示一个要执行的任务,实现了Runnable接口
public abstract class TimerTask implements Runnable {
public long delayMs; //表示当前任务延迟多久后执行(单位ms),比如说延迟3s,则此值为3000
public TimerTask(long delayMs) {
this.delayMs = delayMs;
}
// 指向TimerTaskEntry对象,一个TimerTaskEntry包含一个TimerTask,TimerTaskEntry是可复用的
private TimerTaskList.TimerTaskEntry timerTaskEntry = null;
// 取消当前任务,就是从TimerTaskEntry移出TimerTask,并且把当前的timerTaskEntry置空
public synchronized void cancel() {
if(timerTaskEntry != null) {
timerTaskEntry.remove();
}
timerTaskEntry = null;
}
//设置当前任务绑定的TimerTaskEntry
public synchronized void setTimerTaskEntry(TimerTaskList.TimerTaskEntry entry) {
if(timerTaskEntry != null && timerTaskEntry != entry) {
timerTaskEntry.remove();
}
timerTaskEntry = entry;
}
public TimerTaskList.TimerTaskEntry getTimerTaskEntry() {
return timerTaskEntry;
}
}
2.任务包装类TimerTaskEntry
TimerTaskEntry是TimerTask的包装,实现了Compareable接口,用来比较连个任务的过期时间,以决定任务list插入的顺序
public static class TimerTaskEntry implements Comparable<TimerTaskEntry>{
//包含一个任务
public TimerTask timerTask;
// 任务的过期时间,此处的过期时间设置的过期间隔+系统当前时间(毫秒)
public Long expirationMs;
// 当前任务属于哪一个列表
private TimerTaskList list;
// 当前任务的上一个任务,用双向列表连接
private TimerTaskEntry prev;
private TimerTaskEntry next;
public TimerTaskEntry(TimerTask timerTask,Long expirationMs) {
this.timerTask = timerTask;
this.expirationMs = expirationMs;
// 传递进来任务TimerTask,并设置TimerTask的包装类
if(timerTask != null) {
timerTask.setTimerTaskEntry(this);
}
}
// 任务的取消,就是判断任务TimerTask的Entry是否是当前任务
public boolean cancel() {
return timerTask.getTimerTaskEntry() != this;
}
// 任务的移出
public void remove() {
TimerTaskList currentList = list;
while(currentList != null) {
currentList.remove(this);
currentList = list;
}
}
// 比较两个任务在列表中的位置,及那个先执行
@Override
public int compareTo(TimerTaskEntry that) {
return Long.compare(expirationMs,that.expirationMs);
}
}
3.每个槽中的任务列表
在时间轮中每个槽代表一个列表,即TimerTaskList,每个TimerTaskList中包含多个TimerTaskEntry,并且用双向列表链接。TimerTaskList实现了Delayed接口,用于返回剩余的时间,把上层时间轮的任务移动位置。
public class TimerTaskList implements Delayed {
//当前列表中包含的任务数
private AtomicInteger taskCounter;
// 列表的头结点
private TimerTaskEntry root;
// 过期时间
private AtomicLong expiration = new AtomicLong(-1L);
public TimerTaskList(AtomicInteger taskCounter) {
this.taskCounter = taskCounter;
this.root = new TimerTaskEntry(null,-1L);
root.next = root;
root.prev = root;
}
// 给当前槽设置过期时间
public boolean setExpiration(Long expirationMs) {
return expiration.getAndSet(expirationMs) != expirationMs;
}
public Long getExpiration() {
return expiration.get();
}
// 用于遍历当前列表中的任务
public synchronized void foreach(Consumer<TimerTask> f) {
TimerTaskEntry entry = root.next;
while(entry != root) {
TimerTaskEntry nextEntry = entry.next;
if(!entry.cancel()) {
f.accept(entry.timerTask);
}
entry = nextEntry;
}
}
// 添加任务到列表中
public void add(TimerTaskEntry timerTaskEntry) {
boolean done = false;
while(!done) {
timerTaskEntry.remove();
synchronized (this) {
synchronized (timerTaskEntry) {
if(timerTaskEntry.list == null) {
TimerTaskEntry tail = root.prev;
timerTaskEntry.next = root;
timerTaskEntry.prev = tail;
timerTaskEntry.list = this;
tail.next = timerTaskEntry;
root.prev = timerTaskEntry;
taskCounter.incrementAndGet();
done = true;
}
}
}
}
}
//移出任务
private synchronized void remove(TimerTaskEntry timerTaskEntry) {
synchronized (timerTaskEntry) {
if(timerTaskEntry.list == this) {
timerTaskEntry.next.prev = timerTaskEntry.prev;
timerTaskEntry.prev.next = timerTaskEntry.next;
timerTaskEntry.next = null;
timerTaskEntry.prev = null;
timerTaskEntry.list = null;
taskCounter.decrementAndGet();
}
}
}
public synchronized void flush(Consumer<TimerTaskEntry> f) {
TimerTaskEntry head = root.next;
while(head != root) {
remove(head);
f.accept(head);
head = root.next;
}
expiration.set(-1L);
}
//获得当前任务剩余时间
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(Math.max(getExpiration() - System.currentTimeMillis(),0),TimeUnit.MICROSECONDS);
}
@Override
public int compareTo(Delayed d) {
TimerTaskList other = (TimerTaskList) d;
return Long.compare(getExpiration(),other.getExpiration());
}
}
4.时间轮结构
时间轮TimeWheel代表一层时间轮,即第一层时间轮表示20ms,主要功能是添加任务和,驱动时间轮向前。
public class TimingWheel {
private Long tickMs; //每一个槽表示的时间范围
private Integer wheelSize; // 时间轮大小,即每一层时间轮的大小
private Long startMs; // 系统的启动时间
private AtomicInteger taskCounter; // 当前层任务数
private DelayQueue<TimerTaskList> queue; //延迟队列,用于从队列取每个任务列表
private Long interval; //每一层时间轮代表的时间
private List<TimerTaskList> buckets; // 每一层的每一个槽中的时间任务列表
private Long currentTime; // 修正后的系统启动时间
private TimingWheel overflowWheel = null; // 上一层时间轮
public TimingWheel(Long tickMs, Integer wheelSize, Long startMs, AtomicInteger taskCounter, DelayQueue<TimerTaskList> queue) {
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.startMs = startMs;
this.taskCounter = taskCounter;
this.queue = queue;
interval = tickMs * wheelSize;
currentTime = startMs - (startMs % tickMs); //当前时间,往前推
buckets = new ArrayList<>(wheelSize);
for(int i = 0;i < wheelSize;i++) {
buckets.add(new TimerTaskList(taskCounter)); //创建每一个槽中的列表
}
}
// 创建上层时间轮
public synchronized void addOverflowWheel() {
if(overflowWheel == null) {
overflowWheel = new TimingWheel(
interval, // 此处interval即表示上一层时间轮表示的范围
wheelSize,
currentTime,
taskCounter,
queue
);
}
}
// 添加任务
public boolean add(TimerTaskList.TimerTaskEntry timerTaskEntry) {
Long expiration = timerTaskEntry.expirationMs;
Long thisTime = currentTime + tickMs;
// 任务是否已经取消,取消则返回
if(timerTaskEntry.cancel()) {
return false;
// 当前任务是否已经过期,如果过期则返回false,要立即执行
}else if(expiration < currentTime + tickMs) {
return false;
// 判断当前任务能否在添加到当前时间轮
}else if(expiration < currentTime + interval) {
Long virtualId = expiration / tickMs;
// 计算当前任务要分配在哪个槽中
int whereBucket = (int) (virtualId % wheelSize);
TimerTaskList bucket = buckets.get((int)(virtualId % wheelSize));
bucket.add(timerTaskEntry);
long bucketExpiration = virtualId * tickMs;
//更新槽的过期时间,添加入延迟队列
if(bucket.setExpiration(virtualId * tickMs)) {
queue.offer(bucket);
}
return true;
}else {
//添加任务到高层时间轮
if(overflowWheel == null) addOverflowWheel();
return overflowWheel.add(timerTaskEntry);
}
}
// 向前驱动时间
public void advanceClock(Long timeMs) {
if(timeMs >= currentTime + tickMs) {
currentTime = timeMs - (timeMs % tickMs);
if(overflowWheel != null) {
overflowWheel.advanceClock(currentTime);
}
}
}
}
5. 时间轮接口
- kafka中提供了Timer接口,用于对外提供调用,分别是Timer#add添加任务;Timer#advanceClock驱动时间; Timer#size时间轮中总任务数;Timer#shutdown停止时间轮
public interface Timer {
void add(TimerTask timerTask);
boolean advanceClock(Long timeoutMs) throws Exception;
int size();
void shutdown();
}
- Timer的实现类是SystemTimer
public class SystemTimer implements Timer {
private String executorName;
private Long tickMs = 1L;
private Integer wheelSize = 20;
private Long startMs = System.currentTimeMillis();
//用来执行TimerTask任务
private ExecutorService taskExecutor =
Executors.newFixedThreadPool(1,(runnable) -> {
Thread thread = new Thread(runnable);
thread.setName("executor-" + executorName);
thread.setDaemon(false);
return thread;
});
//延迟队列
private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
private AtomicInteger taskCounter = new AtomicInteger(0);
private TimingWheel timingWheel;
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
private ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
// 用来执行时间轮的重新排列,及上一个槽中的任务列表被执行后,后面的槽中的任务列表移动
private Consumer<TimerTaskEntry> reinsert = (timerTaskEntry) -> addTimerTaskEntry(timerTaskEntry);
public SystemTimer(String executorName, Long tickMs, Integer wheelSize, Long startMs) {
this.executorName = executorName;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.startMs = startMs;
this.timingWheel = new TimingWheel(
tickMs,
wheelSize,
startMs,
taskCounter,
delayQueue
);
}
// 可能会多个线程操作,所以需要加锁
@Override
public void add(TimerTask timerTask) {
readLock.lock();
try{
addTimerTaskEntry(new TimerTaskEntry(timerTask,timerTask.delayMs + System.currentTimeMillis()));
}finally {
readLock.unlock();
}
}
private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) { // 往时间轮添加任务
if(!timingWheel.add(timerTaskEntry)) {
// 返回false并且任务未取消,则提交当前任务立即执行。
if(!timerTaskEntry.cancel()) {
taskExecutor.submit(timerTaskEntry.timerTask);
}
}
}
// 向前驱动时间轮
@Override
public boolean advanceClock(Long timeoutMs) throws Exception{
// 使用阻塞队列获取任务
TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
if(bucket != null) {
writeLock.lock();
try{
while(bucket != null) {
timingWheel.advanceClock(bucket.getExpiration());
// 驱动时间后,需要移动TimerTaskList到上一个槽或者从上一层移动到本层
bucket.flush(reinsert);
bucket = delayQueue.poll();
}
}finally {
writeLock.unlock();
}
return true;
}else {
return false;
}
}
@Override
public int size() {
return taskCounter.get();
}
@Override
public void shutdown() {
taskExecutor.shutdown();
}
}
6. 时间轮接口
public class SystemTimerTest {
//驱动时间轮向前的线程
private static ExecutorService executorService = Executors.newFixedThreadPool(1);
public static SystemTimer timer = new SystemTimer("test",1000L,5,System.currentTimeMillis());
public static void runTask() throws Exception {
for(int i = 0;i < 10000;i+= 1000) {
// 添加任务,每个任务间隔1s
timer.add(new TimerTask(i) {
@Override
public void run() {
System.out.println("运行testTask的时间: " + System.currentTimeMillis());
}
});
}
}
public static void main(String[] args) throws Exception {
runTask();
executorService.submit(() -> {
while(true) {
try {
// 驱动时间轮线程间隔0.2s驱动
timer.advanceClock(200L);
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(1000000);
timer.shutdown();
executorService.shutdown();
}
}
更多推荐
所有评论(0)