看完這個實現之后,感覺還是要多看源碼,多研究。其實JRaft的定時任務調度器是基於Netty的時間輪來做的,如果沒有看過Netty的源碼,很可能並不知道時間輪算法,也就很難想到要去使用這么優秀的定時調度算法了。
對於介紹RepeatedTimer,我拿Node初始化的時候的electionTimer進行講解
this.electionTimer = new RepeatedTimer("JRaft-ElectionTimer", this.options.getElectionTimeoutMs()) {
@Override
protected void onTrigger() {
handleElectionTimeout();
}
@Override
protected int adjustTimeout(final int timeoutMs) {
//在一定范圍內返回一個隨機的時間戳
//為了避免同時發起選舉而導致失敗
return randomTimeout(timeoutMs);
}
};
構造器
由electionTimer的構造方法可以看出RepeatedTimer需要傳入兩個參數,一個是name,另一個是time
//timer是HashedWheelTimer
private final Timer timer;
//實例是HashedWheelTimeout
private Timeout timeout;
public RepeatedTimer(String name, int timeoutMs) {
//name代表RepeatedTimer實例的種類,timeoutMs是超時時間
this(name, timeoutMs, new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048));
}
public RepeatedTimer(String name, int timeoutMs, Timer timer) {
super();
this.name = name;
this.timeoutMs = timeoutMs;
this.stopped = true;
this.timer = Requires.requireNonNull(timer, "timer");
}
在構造器中會根據傳進來的值初始化一個name和一個timeoutMs,然后實例化一個timer,RepeatedTimer的run方法是由timer進行回調。在RepeatedTimer中會持有兩個對象,一個是timer,一個是timeout
啟動RepeatedTimer
對於一個RepeatedTimer實例,我們可以通過start方法來啟動它:
public void start() {
//加鎖,只能一個線程調用這個方法
this.lock.lock();
try {
//destroyed默認是false
if (this.destroyed) {
return;
}
//stopped在構造器中初始化為ture
if (!this.stopped) {
return;
}
//啟動完一次后下次就無法再次往下繼續
this.stopped = false;
//running默認為false
if (this.running) {
return;
}
this.running = true;
schedule();
} finally {
this.lock.unlock();
}
}
在調用start方法進行啟動后會進行一系列的校驗和賦值,從上面的賦值以及加鎖的情況來看,這個是只能被調用一次的。然后會調用到schedule方法中
private void schedule() {
if(this.timeout != null) {
this.timeout.cancel();
}
final TimerTask timerTask = timeout -> {
try {
RepeatedTimer.this.run();
} catch (final Throwable t) {
LOG.error("Run timer task failed, taskName={}.", RepeatedTimer.this.name, t);
}
};
this.timeout = this.timer.newTimeout(timerTask, adjustTimeout(this.timeoutMs), TimeUnit.MILLISECONDS);
}
如果timeout不為空,那么會調用HashedWheelTimeout的cancel方法。然后封裝一個TimerTask實例,當執行TimerTask的run方法的時候會調用RepeatedTimer實例的run方法。然后傳入到timer中,TimerTask的run方法由timer進行調用,並將返回值賦值給timeout。
如果timer調用了TimerTask的run方法,那么便會回調到RepeatedTimer的run方法中:
RepeatedTimer#run
public void run() {
//加鎖
this.lock.lock();
try {
//表示RepeatedTimer已經被調用過
this.invoking = true;
} finally {
this.lock.unlock();
}
try {
//然后會調用RepeatedTimer實例實現的方法
onTrigger();
} catch (final Throwable t) {
LOG.error("Run timer failed.", t);
}
boolean invokeDestroyed = false;
this.lock.lock();
try {
this.invoking = false;
//如果調用了stop方法,那么將不會繼續調用schedule方法
if (this.stopped) {
this.running = false;
invokeDestroyed = this.destroyed;
} else {
this.timeout = null;
schedule();
}
} finally {
this.lock.unlock();
}
if (invokeDestroyed) {
onDestroy();
}
}
protected void onDestroy() {
// NO-OP
}
這個run方法會由timer進行回調,如果沒有調用stop或destroy方法的話,那么調用完onTrigger方法后會繼續調用schedule,然后一次次循環調用RepeatedTimer的run方法。
如果調用了destroy方法,在這里會有一個onDestroy的方法,可以由實現類override復寫執行一個鈎子。
HashedWheelTimer的基本介紹
HashedWheelTimer通過一定的hash規則將不同timeout的定時任務划分到HashedWheelBucket進行管理,而HashedWheelBucket利用雙向鏈表結構維護了某一時刻需要執行的定時任務列表
Wheel
時間輪,是一個HashedWheelBucket數組,數組數量越多,定時任務管理的時間精度越精確。tick每走一格都會將對應的wheel數組里面的bucket拿出來進行調度。
Worker
Worker繼承自Runnable,HashedWheelTimer必須通過Worker線程操作HashedWheelTimer中的定時任務。Worker是整個HashedWheelTimer的執行流程管理者,控制了定時任務分配、全局deadline時間計算、管理未執行的定時任務、時鍾計算、未執行定時任務回收處理。
HashedWheelTimeout
是HashedWheelTimer的執行單位,維護了其所屬的HashedWheelTimer和HashedWheelBucket的引用、需要執行的任務邏輯、當前輪次以及當前任務的超時時間(不變)等,可以認為是自定義任務的一層Wrapper。
HashedWheelBucket
HashedWheelBucket維護了hash到其內的所有HashedWheelTimeout結構,是一個雙向隊列。
HashedWheelTimer的構造器
在初始化RepeatedTimer實例的時候會實例化一個HashedWheelTimer:
new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048)
然后調用HashedWheelTimer的構造器:
private final HashedWheelBucket[] wheel;
private final int mask;
private final long tickDuration;
private final Worker worker = new Worker();
private final Thread workerThread;
private final long maxPendingTimeouts;
private static final int INSTANCE_COUNT_LIMIT = 256;
private static final AtomicInteger instanceCounter = new AtomicInteger();
private static final AtomicBoolean warnedTooManyInstances = new AtomicBoolean();
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
tickDuration
this(threadFactory, tickDuration, unit, ticksPerWheel, -1);
}
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
long maxPendingTimeouts) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
//unit = MILLISECONDS
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
// Normalize ticksPerWheel to power of two and initialize the wheel.
// 創建一個HashedWheelBucket數組
// 創建時間輪基本的數據結構,一個數組。長度為不小於ticksPerWheel的最小2的n次方
wheel = createWheel(ticksPerWheel);
// 這是一個標示符,用來快速計算任務應該呆的格子。
// 我們知道,給定一個deadline的定時任務,其應該呆的格子=deadline%wheel.length.但是%操作是個相對耗時的操作,所以使用一種變通的位運算代替:
// 因為一圈的長度為2的n次方,mask = 2^n-1后低位將全部是1,然后deadline&mast == deadline%wheel.length
// java中的HashMap在進行hash之后,進行index的hash尋址尋址的算法也是和這個一樣的
mask = wheel.length - 1;
// Convert tickDuration to nanos.
//tickDuration傳入是1的話,這里會轉換成1000000
this.tickDuration = unit.toNanos(tickDuration);
// Prevent overflow.
// 校驗是否存在溢出。即指針轉動的時間間隔不能太長而導致tickDuration*wheel.length>Long.MAX_VALUE
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE
/ wheel.length));
}
//將worker包裝成thread
workerThread = threadFactory.newThread(worker);
//maxPendingTimeouts = -1
this.maxPendingTimeouts = maxPendingTimeouts;
//如果HashedWheelTimer實例太多,那么就會打印一個error日志
if (instanceCounter.incrementAndGet() > INSTANCE_COUNT_LIMIT
&& warnedTooManyInstances.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
這個構造器里面主要做一些初始化的工作。
- 初始化一個wheel數據,我們這里初始化的數組長度為2048.
- 初始化mask,用來計算槽位的下標,類似於hashmap的槽位的算法,因為wheel的長度已經是一個2的n次方,所以2^n-1后低位將全部是1,用&可以快速的定位槽位,比%耗時更低
- 初始化tickDuration,這里會將傳入的tickDuration轉化成納秒,那么這里是1000,000
- 校驗整個時間輪走完的時間不能過長
- 包裝worker線程
- 因為HashedWheelTimer是一個很消耗資源的一個結構,所以校驗HashedWheelTimer實例不能太多,如果太多會打印error日志
啟動timer
時間輪算法中並不需要手動的去調用start方法來啟動,而是在添加節點的時候會啟動時間輪。
我們在RepeatedTimer的schedule方法里會調用newTimeout向時間輪中添加一個任務。
HashedWheelTimer#newTimeout
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
+ ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 如果時間輪沒有啟動,則啟動
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
//在delay為正數的情況下,deadline是不可能為負數
//如果為負數,那么說明超過了long的最大值
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// 這里定時任務不是直接加到對應的格子中,而是先加入到一個隊列里,然后等到下一個tick的時候,
// 會從隊列里取出最多100000個任務加入到指定的格子中
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
//Worker會去處理timeouts隊列里面的數據
timeouts.add(timeout);
return timeout;
}
在這個方法中,在校驗之后會調用start方法啟動時間輪,然后設置deadline,這個時間等於時間輪啟動的時間點+延遲的的時間;
然后新建一個HashedWheelTimeout實例,會直接加入到timeouts隊列中去,timeouts對列會在worker的run方法里面取出來放入到wheel中進行處理。
然后我們來看看start方法:
HashedWheelTimer#start
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> workerStateUpdater = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class,"workerState");
private volatile int workerState;
//不需要你主動調用,當有任務添加進來的的時候他就會跑
public void start() {
//workerState一開始的時候是0(WORKER_STATE_INIT),然后才會設置為1(WORKER_STATE_STARTED)
switch (workerStateUpdater.get(this)) {
case WORKER_STATE_INIT:
//使用cas來獲取啟動調度的權力,只有競爭到的線程允許來進行實例啟動
if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
//如果成功設置了workerState,那么就調用workerThread線程
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// 等待worker線程初始化時間輪的啟動時間
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
//這里使用countDownLauch來確保調度的線程已經被啟動
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
由這里我們可以看出,啟動時間輪是不需要手動去調用的,而是在有任務的時候會自動運行,防止在沒有任務的時候空轉浪費資源。
在start方法里面會使用AtomicIntegerFieldUpdater的方式來更新workerState這個變量,如果沒有啟動過那么直接在cas成功之后調用start方法啟動workerThread線程。
如果workerThread還沒運行,那么會在while循環中等待,直到workerThread運行為止才會往下運行。
開始時間輪轉
時間輪的運轉是在Worker的run方法中進行的:
Worker#run
private final Set<Timeout> unprocessedTimeouts = new HashSet<>();
private long tick;
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
//HashedWheelTimer的start方法會繼續往下運行
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();
do {
//返回的是當前的nanoTime- startTime
//也就是返回的是 每 tick 一次的時間間隔
final long deadline = waitForNextTick();
if (deadline > 0) {
//算出時間輪的槽位
int idx = (int) (tick & mask);
//移除cancelledTimeouts中的bucket
// 從bucket中移除timeout
processCancelledTasks();
HashedWheelBucket bucket = wheel[idx];
// 將newTimeout()方法中加入到待處理定時任務隊列中的任務加入到指定的格子中
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;
}
// 校驗如果workerState是started狀態,那么就一直循環
} while (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket : wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
//如果有沒有被處理的timeout,那么加入到unprocessedTimeouts對列中
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
//處理被取消的任務
processCancelledTasks();
}
- 這個方法首先會設置一個時間輪的開始時間startTime,然后調用startTimeInitialized的countDown讓被阻塞的線程往下運行
- 調用waitForNextTick等待到下次tick的到來,並返回當次的tick時間-startTime
- 通過&的方式獲取時間輪的槽位
- 移除掉被取消的task
- 將timeouts中的任務轉移到對應的wheel槽位中,如果槽位中不止一個bucket,那么串成一個鏈表
- 執行格子中的到期任務
- 遍歷整個wheel,將過期的bucket放入到unprocessedTimeouts隊列中
- 將timeouts中過期的bucket放入到unprocessedTimeouts隊列中
上面所有的過期但未被處理的bucket會在調用stop方法的時候返回unprocessedTimeouts隊列中的數據。所以unprocessedTimeouts中的數據只是做一個記錄,並不會再次被執行。
時間輪的所有處理過程都在do-while循環中被處理,我們下面一個個分析
處理被取消的任務
Worker#processCancelledTasks
private void processCancelledTasks() {
for (;;) {
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// all processed
break;
}
try {
timeout.remove();
} catch (Throwable t) {
if (LOG.isWarnEnabled()) {
LOG.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}
這個方法相當的簡單,因為在調用HashedWheelTimer的stop方法的時候會將要取消的HashedWheelTimeout實例放入到cancelledTimeouts隊列中,所以這里只需要循環把隊列中的數據取出來,然后調用HashedWheelTimeout的remove方法將自己在bucket移除就好了
HashedWheelTimeout#remove
void remove() {
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
//這里面涉及到鏈表的引用摘除,十分清晰易懂,想了解的可以去看看
bucket.remove(this);
} else {
timer.pendingTimeouts.decrementAndGet();
}
}
轉移數據到時間輪中
Worker#transferTimeoutsToBuckets
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
// 每次tick只處理10w個任務,以免阻塞worker線程
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
//已經被取消了;
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
//calculated = tick 次數
long calculated = timeout.deadline / tickDuration;
// 計算剩余的輪數, 只有 timer 走夠輪數, 並且到達了 task 所在的 slot, task 才會過期
timeout.remainingRounds = (calculated - tick) / wheel.length;
//如果任務在timeouts隊列里面放久了, 以至於已經過了執行時間, 這個時候就使用當前tick, 也就是放到當前bucket, 此方法調用完后就會被執行
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
//// 算出任務應該插入的 wheel 的 slot, slotIndex = tick 次數 & mask, mask = wheel.length - 1
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
//將timeout加入到bucket鏈表中
bucket.addTimeout(timeout);
}
}
- 每次調用這個方法會處理10w個任務,以免阻塞worker線程
- 在校驗之后會用timeout的deadline除以每次tick運行的時間tickDuration得出需要tick多少次才會運行這個timeout的任務
- 由於timeout的deadline實際上還包含了worker線程啟動到timeout加入隊列這段時間,所以在算remainingRounds的時候需要減去當前的tick次數
|_____________________|____________
worker啟動時間 timeout任務加入時間
- 最后根據計算出來的ticks來&算出wheel的槽位,加入到bucket鏈表中
執行到期任務
在worker的run方法的do-while循環中,在根據當前的tick拿到wheel中的bucket后會調用expireTimeouts方法來處理這個bucket的到期任務
HashedWheelBucket#expireTimeouts
// 過期並執行格子中的到期任務,tick到該格子的時候,worker線程會調用這個方法,
//根據deadline和remainingRounds判斷任務是否過期
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
//遍歷格子中的所有定時任務
while (timeout != null) {
// 先保存next,因為移除后next將被設置為null
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
//從bucket鏈表中移除當前timeout,並返回鏈表中下一個timeout
next = remove(timeout);
//如果timeout的時間小於當前的時間,那么就調用expire執行task
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
//不可能發生的情況,就是說round已經為0了,deadline卻>當前槽的deadline
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)",
timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
//因為當前的槽位已經過了,說明已經走了一圈了,把輪數減一
timeout.remainingRounds--;
}
//把指針放置到下一個timeout
timeout = next;
}
}
expireTimeouts方法會根據當前tick到的槽位,然后獲取槽位中的bucket並找到鏈表中到期的timeout並執行
- 因為每一次的指針都會指向bucket中的下一個timeout,所以timeout為空時說明整個鏈表已經遍歷完畢,所以用while循環做非空校驗
- 因為沒一次循環都會把當前的輪數大於零的做減一處理,所以當輪數小於或等於零的時候就需要把當前的timeout移除bucket鏈表
- 在校驗deadline之后執行expire方法,這里會真正進行任務調用
HashedWheelTimeout#task
public void expire() {
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
try {
task.run(this);
} catch (Throwable t) {
if (LOG.isWarnEnabled()) {
LOG.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
這里這個task就是在schedule方法中構建的timerTask實例,調用timerTask的run方法會調用到外層的RepeatedTimer的run方法,從而調用到RepeatedTimer子類實現的onTrigger方法。
到這里Jraft的定時調度就講完了,感覺還是很有意思的。