時間輪
什么是時間輪?
簡單來說,時間輪是一種高效利用線程資源進行批量化調度的一種調度模型。
通過把大批量的調度任務全部綁定到同一個調度器上,使用這一個調度器來進行所有任務的管理、觸發、以及運行。
所以時間輪的模型能夠高效管理各種延時任務、周期任務、通知任務。
時間輪是以時間作為刻度組成的一個環形隊列,所以叫做時間輪。這個環形隊列采用數組來實現HashedWheelBucket[],數組的每個元素稱為槽,每個槽可以存放一個定時任務列表,叫HashedWheelBucket,它是一個雙向鏈表,鏈表的每個節點表示一個定時任務項(HashedWheelTimeout),其中封裝了真正的定時任務TimerTask。
時間輪由多個時間格組成,每個時間格代表當前時間輪的基本時間跨度(ticketDuration),其中時間輪的時間格的個數是固定的。
如上圖,有16個時間格(槽),假設每個時間格的單位是1s,那么整個時間輪走完一圈需要16s,每秒鍾(即時間格的單位,也可以為1ms、1min、1h等)指針會沿着順時針方向轉動一格,通過指針移動,來獲得每個時間格中的任務列表,然后遍歷並執行這一個時間格內的雙向鏈表的每個任務,依此循環。
HashedWheelTimer是什么?
Netty的 HashedWheelTimer 是一個粗略的定時器實現,之所以稱之為粗略的實現是因為該時間輪並沒有嚴格的准時執行定時任務,而是在每隔一個時間間隔之后的時間節點執行,並執行當前時間節點之前到期的定時任務
不過具體的定時任務的時間執行精度可以通過調節 HashedWheelTimer 構造方法的時間間隔的大小來進行調節,在大多數網絡應用的情況下,由於 IO 延遲的存在,並不會嚴格要求具體的時間執行精度,所以默認的 100ms 時間間隔可以滿足大多數的情況,不需要再花精力去調節該時間精度
Netty時間輪HashedWheelTimer的使用:
/** * 構建HashedWheelTimer時間輪 * <p> * threadFactory:創建處理任務的線程工廠 * tickDuration:100 ,表示每個時間格代表當前時間輪的基本時間跨度,這里是100ms,也就是指針100ms跳動一次,每次跳動一個窗格 * ticksPerWheel:512,表示時間輪上一共有多少個時間格,分配的時間格越多,占用內存空間就越大 * leakDetection:是否開啟內存泄漏檢測 * maxPendingTimeouts[可選參數],最大允許等待的任務數,默認沒有限制 * <p> * 最后通過newTimeout()把需要延遲執行的任務添加到時間輪中 */ private static final HashedWheelTimer HASHED_WHEEL_TIMER = new HashedWheelTimer( new DefaultThreadFactory("demo-timer"), 100, TimeUnit.MILLISECONDS, 512, true); public static void main(String[] args) { System.out.println("延時任務提交"); // 延時多久執行 long delay = 10L; HASHED_WHEEL_TIMER.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { System.out.println("延時任務觸發"); } }, delay, TimeUnit.SECONDS); }
時間輪運行大致流程:
HashedWheelTimer
HashedWheelTimer維護一個名為“wheel”的數據結構。
private final HashedWheelBucket[] wheel; // 時間輪初始化 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
簡單地說,wheel是一個TimerTasks的哈希表,它的哈希函數是“任務的截止日期”。每個時間輪的時間格數ticksPerWheel默認是512。如果要安排大量的超時,可以指定更大的值。
只需要一個HashedWheelTimer實例即可,多個線程共享它。
字段及說明:
字段 | 說明 |
tickDuration | 時間格跨度,默認100ms |
ticksPerWheel | 時間輪的格子數,默認512 |
maxPendingTimeouts | 時間輪中任務的最大數量 |
deadline | 延時任務的截止時間,值為當前時間+延時任務的延時時間-時間輪啟動時間 |
tick | 時間輪啟動以來指針總的轉動次數 |
remainingRounds | 槽中延時任務剩余的圈(輪)數,為0時,執行該延時任務 |
構造方法:
/** * Creates a new timer. * * @param threadFactory 創建線程的工廠 * @param tickDuration 每格的時間間隔,默認100ms,0.1秒 * @param unit 時間單位,默認為毫秒 * @param ticksPerWheel 時間輪的格子數,默認為512;如果傳入的不是2的N次方,則會調整為大於等於該參數的第一個2的N次方,好處是可以優化hash值的計算 * @param leakDetection 如果false,那么只有工作線程不是后台線程時才會追蹤資源泄露,這個參數可以忽略 * @param maxPendingTimeouts 最大的pending數量(時間輪中任務的最大數量),超過這個值之后調用將拋出異常,0或者負數表示沒有限制,默認為-1 * @param taskExecutor 任務線程池,用於執行提交的任務,調用者負責在不需要時關閉它 * * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null} * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0 */ public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) { checkNotNull(threadFactory, "threadFactory"); checkNotNull(unit, "unit"); checkPositive(tickDuration, "tickDuration"); checkPositive(ticksPerWheel, "ticksPerWheel"); this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor"); // 將ticksPerWheel(輪子上的時間格數)向上取值為2的次冪,方便進行求商和取余計算,並初始化時間輪 wheel = createWheel(ticksPerWheel); // mask 的設計和HashMap一樣,通過限制數組的大小為2的次方,利用位運算來替代取模運算,提高性能 mask = wheel.length - 1; // Convert tickDuration to nanos. long duration = unit.toNanos(tickDuration); // 防止溢出 // tickDuration * ticksPerWheel 必須小於Long.MAX_VALUE if (duration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } // tickDuration 不能小於 1ms if (duration < MILLISECOND_NANOS) { logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS); this.tickDuration = MILLISECOND_NANOS; } else { this.tickDuration = duration; } // 創建工作線程,用於指針轉動和觸發時間格里的延時任務的執行 workerThread = threadFactory.newThread(worker); leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; // 時間輪中任務的最大數量 this.maxPendingTimeouts = maxPendingTimeouts; // HashedWheelTimer的實例數量大於64的話,打印error日志 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } }
構造流程:
添加任務:
工作線程 run() 邏輯(時間輪啟動邏輯):
newTimeout 方法添加延時任務:
/** * 添加延時任務 * @param task 任務 * @param delay 延時時間 * @param unit 延時時間單位 * @return */ @Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { checkNotNull(task, "task"); checkNotNull(unit, "unit"); // 任務數+1 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); // 如果任務數超過最大限制,那么則拋出異常 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } // 啟動時間輪開啟任務工作線程workerThread start(); // 將延時任務添加到延時隊列中,該隊列將在下一個滴答聲中處理(指針的下一次轉動) // 在處理過程中,所有排隊的hashedwheeltimeout將被添加到正確的HashedWheelBucket // 計算延時任務的延時時間,值為當前的時間+當前任務執行的延遲時間-時間輪啟動的時間 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // 防止溢出 if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } // 封裝延時任務 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); // 將延時任務保存到延時任務隊列中 timeouts.add(timeout); return timeout; }
start(),啟動時間輪,執行Worker封裝的線程,Worker源碼:
/** * 指針轉動和延時任務執行的線程 */ private final class Worker implements Runnable { // 用於記錄未執行的延時任務 private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); // 總的tick數(指針嘀嗒的次數) private long tick; @Override public void run() { // 工作線程(時間輪)啟動時間 startTime = System.nanoTime(); if (startTime == 0) { // 我們在這里使用0作為未初始化值的指示符,所以要確保初始化時它不是0 startTime = 1; } // 喚醒被阻塞的start()方法,通知時間輪已經啟動完畢 startTimeInitialized.countDown(); do { // 這里會休眠tick的時間,模擬指針走動 final long deadline = waitForNextTick(); if (deadline > 0) { // 計算時間輪的槽位 int idx = (int) (tick & mask); // 清理已經取消的任務 processCancelledTasks(); // 得到當前指針位置的時間槽 HashedWheelBucket bucket = wheel[idx]; // 將newTimeout()方法中加入到待處理定時任務隊列中的任務加入到指定的格子中 transferTimeoutsToBuckets(); // 運行目前指針指向的槽中的bucket鏈表中的任務,執行到期的延時任務,交給taskExecutor線程池去執行 bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket: wheel) { // 清除時間輪中不需要處理的任務 bucket.clearTimeouts(unprocessedTimeouts); } for (;;) { // 遍歷任務隊列,發現如果有任務被取消,則添加到unprocessedTimeouts,也就是不需要處理的隊列中。 HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { // 如果延時任務沒被取消,記錄到未執行的任務Set集合中 unprocessedTimeouts.add(timeout); } } // 處理被取消的任務 processCancelledTasks(); } /** * 將延時任務隊列中等待添加到時間輪中的延時任務轉移到時間輪的指定位置 */ private void transferTimeoutsToBuckets() { // 每次轉移10w個延時任務 for (int i = 0; i < 100000; i++) { // 從隊列中出隊一個延時任務 HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { // Was cancelled in the meantime. continue; } // 到期一共需要走多少時間格(tick次數),deadline表示當前任務的延遲時間(從時間輪啟動時計算),tickDuration表示時間格的時間間隔 long calculated = timeout.deadline / tickDuration; // tick已經走了的時間格,到期一共還需要需要走多少圈 timeout.remainingRounds = (calculated - tick) / wheel.length; // 如果延時任務在隊列中等待太久已經過了執行時間,那么這個時候就使用當前tick,也就是放在當前的bucket,此方法調用完后就會被執行 final long ticks = Math.max(calculated, tick); // 槽的索引,stopIndex = tick 次數 & mask, mask = wheel.length - 1 int stopIndex = (int) (ticks & mask); // 根據索引該任務應該放到的槽 HashedWheelBucket bucket = wheel[stopIndex]; // 將任務添加到槽中,鏈表末尾 bucket.addTimeout(timeout); } } /** * 處理取消掉的延時任務 */ private void processCancelledTasks() { for (;;) { HashedWheelTimeout timeout = cancelledTimeouts.poll(); if (timeout == null) { // all processed break; } try { timeout.remove(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown while process a cancellation task", t); } } } } /** * 從時間輪的啟動時間startTime和當前的tick數(指針跳動次數)計算下一次指針跳動的時間,然后休眠等待下一次指針跳動時間到來 */ private long waitForNextTick() { // deadline返回的是下一次時間輪指針跳動的時間與時間格啟動的時間間隔 long deadline = tickDuration * (tick + 1); for (;;) { // 計算當前時間距離啟動時間的時間間隔 final long currentTime = System.nanoTime() - startTime; // 距離下一次指針跳動還需休眠多長時間 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; // 到了指針調到下一個槽位的時間 if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } } try { // 表示距離下一次指針跳動還需要一段時間,所以休眠等待時間的到來 Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } } /** * 記錄未執行的延時任務 */ public Set<Timeout> unprocessedTimeouts() { return Collections.unmodifiableSet(unprocessedTimeouts); } }
HashedWheelTimer完整源碼:

package io.netty.util; import static io.netty.util.internal.ObjectUtil.checkInRange; import static io.netty.util.internal.ObjectUtil.checkPositive; import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.util.concurrent.ImmediateExecutor; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.Collections; import java.util.HashSet; import java.util.Queue; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import static io.netty.util.internal.StringUtil.simpleClassName; /* * Netty時間輪 */ public class HashedWheelTimer implements Timer { static final InternalLogger logger = InternalLoggerFactory.getInstance(HashedWheelTimer.class); private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger(); private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean(); private static final int INSTANCE_COUNT_LIMIT = 64; private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1); private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance() .newResourceLeakDetector(HashedWheelTimer.class, 1); private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState"); private final ResourceLeakTracker<HashedWheelTimer> leak; // 指針轉動和延時任務執行的線程 private final Worker worker = new Worker(); // worker任務封裝的工作線程,用於指針轉動和觸發時間格里的延時任務的執行 private final Thread workerThread; public static final int WORKER_STATE_INIT = 0; public static final int WORKER_STATE_STARTED = 1; public static final int WORKER_STATE_SHUTDOWN = 2; @SuppressWarnings({"unused", "FieldMayBeFinal"}) private volatile int workerState; // 0 - init, 1 - started, 2 - shut down // 每個時間格的時間跨度,默認為100ms private final long tickDuration; // 時間輪(環形數組),HashedWheelBucket為每個時間格的槽 private final HashedWheelBucket[] wheel; private final int mask; private final CountDownLatch startTimeInitialized = new CountDownLatch(1); // 延時任務隊列,隊列中為等待被添加到時間輪的延時任務 private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue(); // 保存已經取消的延時任務的隊列 private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue(); // 記錄當前的任務數 private final AtomicLong pendingTimeouts = new AtomicLong(0); // 最大的任務數 private final long maxPendingTimeouts; // 執行延時任務的線程池 private final Executor taskExecutor; // 工作線程啟動時間 private volatile long startTime; ////////////////////////// 構造器 start ////////////////////////// public HashedWheelTimer() { this(Executors.defaultThreadFactory()); } public HashedWheelTimer(long tickDuration, TimeUnit unit) { this(Executors.defaultThreadFactory(), tickDuration, unit); } public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) { this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel); } /** * 使用默認的tickDuration(時間格跨度,100ms)和默認的ticksPerWheel(時間格總數,512)創建一個新的計時器(時間輪) */ public HashedWheelTimer(ThreadFactory threadFactory) { this(threadFactory, 100, TimeUnit.MILLISECONDS); } public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit) { this(threadFactory, tickDuration, unit, 512); } public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) { this(threadFactory, tickDuration, unit, ticksPerWheel, true); } public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) { this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1); } public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) { this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, maxPendingTimeouts, ImmediateExecutor.INSTANCE); } /** * Creates a new timer. * * @param threadFactory 創建線程的工廠 * @param tickDuration 每格的時間間隔,默認100ms,0.1秒 * @param unit 時間單位,默認為毫秒 * @param ticksPerWheel 時間輪的格子數,默認為512;如果傳入的不是2的N次方,則會調整為大於等於該參數的第一個2的N次方,好處是可以優化hash值的計算 * @param leakDetection 如果false,那么只有工作線程不是后台線程時才會追蹤資源泄露,這個參數可以忽略 * @param maxPendingTimeouts 最大的pending數量(時間輪中任務的最大數量),超過這個值之后調用將拋出異常,0或者負數表示沒有限制,默認為-1 * @param taskExecutor 任務線程池,用於執行提交的任務,調用者負責在不需要時關閉它 * * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null} * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0 */ public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) { checkNotNull(threadFactory, "threadFactory"); checkNotNull(unit, "unit"); checkPositive(tickDuration, "tickDuration"); checkPositive(ticksPerWheel, "ticksPerWheel"); this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor"); // 將ticksPerWheel(輪子上的時間格數)向上取值為2的次冪,方便進行求商和取余計算,並初始化時間輪 wheel = createWheel(ticksPerWheel); // mask 的設計和HashMap一樣,通過限制數組的大小為2的次方,利用位運算來替代取模運算,提高性能 mask = wheel.length - 1; // Convert tickDuration to nanos. long duration = unit.toNanos(tickDuration); // 防止溢出 // tickDuration * ticksPerWheel 必須小於Long.MAX_VALUE if (duration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } // tickDuration 不能小於 1ms if (duration < MILLISECOND_NANOS) { logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS); this.tickDuration = MILLISECOND_NANOS; } else { this.tickDuration = duration; } // 創建工作線程,用於指針轉動和觸發時間格里的延時任務的執行 workerThread = threadFactory.newThread(worker); leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; // 時間輪中任務的最大數量 this.maxPendingTimeouts = maxPendingTimeouts; // HashedWheelTimer的實例數量大於64的話,打印error日志 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } } ////////////////////////// 構造器 end ////////////////////////// @Override protected void finalize() throws Throwable { try { super.finalize(); } finally { // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If // we have not yet shutdown then we want to make sure we decrement the active instance count. if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) { INSTANCE_COUNTER.decrementAndGet(); } } } /** * 初始化時間輪環形數組 * @param ticksPerWheel * @return */ private static HashedWheelBucket[] createWheel(int ticksPerWheel) { // ticksPerWheel不能大於2^30 checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel"); // 將ticksPerWheel(輪子上的時間格數)向上取值為2的次冪 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); // 創建時間輪環形數組 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i ++) { wheel[i] = new HashedWheelBucket(); } return wheel; } /** * 將ticksPerWheel(輪子上的時間格數)向上取值為2的次冪 */ private static int normalizeTicksPerWheel(int ticksPerWheel) { int normalizedTicksPerWheel = 1; while (normalizedTicksPerWheel < ticksPerWheel) { normalizedTicksPerWheel <<= 1; } return normalizedTicksPerWheel; } /** * 顯式啟動后台線程。即使您沒有調用此方法,后台線程也會按需自動啟動 */ public void start() { switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } // Wait until the startTime is initialized by the worker. while (startTime == 0) { try { startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } } @Override public Set<Timeout> stop() { if (Thread.currentThread() == workerThread) { throw new IllegalStateException( HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName()); } if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { // workerState can be 0 or 2 at this moment - let it always be 2. if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) { INSTANCE_COUNTER.decrementAndGet(); if (leak != null) { boolean closed = leak.close(this); assert closed; } } return Collections.emptySet(); } try { boolean interrupted = false; while (workerThread.isAlive()) { workerThread.interrupt(); try { workerThread.join(100); } catch (InterruptedException ignored) { interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } finally { INSTANCE_COUNTER.decrementAndGet(); if (leak != null) { boolean closed = leak.close(this); assert closed; } } return worker.unprocessedTimeouts(); } /** * 添加延時任務 * @param task 任務 * @param delay 延時時間 * @param unit 延時時間單位 * @return */ @Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { checkNotNull(task, "task"); checkNotNull(unit, "unit"); // 任務數+1 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); // 如果任務數超過最大限制,那么則拋出異常 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } // 啟動時間輪開啟任務工作線程workerThread start(); // 將延時任務添加到延時隊列中,該隊列將在下一個滴答聲中處理(指針的下一次轉動) // 在處理過程中,所有排隊的hashedwheeltimeout將被添加到正確的HashedWheelBucket // 計算延時任務的延時時間,值為當前的時間+當前任務執行的延遲時間-時間輪啟動的時間 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // 防止溢出 if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } // 封裝延時任務 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); // 將延時任務保存到延時任務隊列中 timeouts.add(timeout); return timeout; } /** * Returns the number of pending timeouts of this {@link Timer}. */ public long pendingTimeouts() { return pendingTimeouts.get(); } private static void reportTooManyInstances() { if (logger.isErrorEnabled()) { String resourceType = simpleClassName(HashedWheelTimer.class); logger.error("You are creating too many " + resourceType + " instances. " + resourceType + " is a shared resource that must be reused across the JVM, " + "so that only a few instances are created."); } } /** * 指針轉動和延時任務執行的線程 */ private final class Worker implements Runnable { // 用於記錄未執行的延時任務 private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); // 總的tick數(指針嘀嗒的次數) private long tick; @Override public void run() { // 工作線程(時間輪)啟動時間 startTime = System.nanoTime(); if (startTime == 0) { // 我們在這里使用0作為未初始化值的指示符,所以要確保初始化時它不是0 startTime = 1; } // 喚醒被阻塞的start()方法,通知時間輪已經啟動完畢 startTimeInitialized.countDown(); do { // 這里會休眠tick的時間,模擬指針走動 final long deadline = waitForNextTick(); if (deadline > 0) { // 計算時間輪的槽位 int idx = (int) (tick & mask); // 清理已經取消的任務 processCancelledTasks(); // 得到當前指針位置的時間槽 HashedWheelBucket bucket = wheel[idx]; // 將newTimeout()方法中加入到待處理定時任務隊列中的任務加入到指定的格子中 transferTimeoutsToBuckets(); // 運行目前指針指向的槽中的bucket鏈表中的任務,執行到期的延時任務,交給taskExecutor線程池去執行 bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket: wheel) { // 清除時間輪中不需要處理的任務 bucket.clearTimeouts(unprocessedTimeouts); } for (;;) { // 遍歷任務隊列,發現如果有任務被取消,則添加到unprocessedTimeouts,也就是不需要處理的隊列中。 HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { // 如果延時任務沒被取消,記錄到未執行的任務Set集合中 unprocessedTimeouts.add(timeout); } } // 處理被取消的任務 processCancelledTasks(); } /** * 將延時任務隊列中等待添加到時間輪中的延時任務轉移到時間輪的指定位置 */ private void transferTimeoutsToBuckets() { // 每次轉移10w個延時任務 for (int i = 0; i < 100000; i++) { // 從隊列中出隊一個延時任務 HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { // Was cancelled in the meantime. continue; } // 到期一共需要走多少時間格(tick次數),deadline表示當前任務的延遲時間(從時間輪啟動時計算),tickDuration表示時間格的時間間隔 long calculated = timeout.deadline / tickDuration; // tick已經走了的時間格,到期一共還需要需要走多少圈 timeout.remainingRounds = (calculated - tick) / wheel.length; // 如果延時任務在隊列中等待太久已經過了執行時間,那么這個時候就使用當前tick,也就是放在當前的bucket,此方法調用完后就會被執行 final long ticks = Math.max(calculated, tick); // 槽的索引,stopIndex = tick 次數 & mask, mask = wheel.length - 1 int stopIndex = (int) (ticks & mask); // 根據索引該任務應該放到的槽 HashedWheelBucket bucket = wheel[stopIndex]; // 將任務添加到槽中,鏈表末尾 bucket.addTimeout(timeout); } } /** * 處理取消掉的延時任務 */ private void processCancelledTasks() { for (;;) { HashedWheelTimeout timeout = cancelledTimeouts.poll(); if (timeout == null) { // all processed break; } try { timeout.remove(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown while process a cancellation task", t); } } } } /** * 從時間輪的啟動時間startTime和當前的tick數(指針轉動次數)計算下一次指針轉動的時間,然后休眠等待下一次指針轉動時間到來 */ private long waitForNextTick() { // deadline返回的是下一次時間輪指針轉動的時間與時間格啟動的時間間隔 long deadline = tickDuration * (tick + 1); for (;;) { // 計算當前時間距離啟動時間的時間間隔 final long currentTime = System.nanoTime() - startTime; // 距離下一次指針轉動還需休眠多長時間 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; // 到了指針調到下一個槽位的時間 if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } } // Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect // the JVM if it runs on windows. // // See https://github.com/netty/netty/issues/356 if (PlatformDependent.isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; if (sleepTimeMs == 0) { sleepTimeMs = 1; } } try { // 表示距離下一次指針轉動還需要一段時間,所以休眠等待時間的到來 Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } } /** * 記錄未執行的延時任務 */ public Set<Timeout> unprocessedTimeouts() { return Collections.unmodifiableSet(unprocessedTimeouts); } } /** * 延時任務 */ private static final class HashedWheelTimeout implements Timeout, Runnable { private static final int ST_INIT = 0; private static final int ST_CANCELLED = 1; private static final int ST_EXPIRED = 2; private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state"); private final HashedWheelTimer timer; private final TimerTask task; // 任務執行的截止時間,值為當前時間+延時任務延時時間-時間輪啟動時間 private final long deadline; @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" }) private volatile int state = ST_INIT; // 剩下的圈(輪)數 // remainingRounds將由Worker.transferTimeoutsToBuckets()在HashedWheelTimeout被添加到正確的HashedWheelBucket之前計算和設置。 long remainingRounds; // HashedWheelTimerBucket槽中的延時任務列表是一個雙向鏈表 // 因為只有workerThread會對它進行操作,所以不需要 synchronization / volatile. HashedWheelTimeout next; HashedWheelTimeout prev; // 當前延時任務所插入時間輪的哪個槽 HashedWheelBucket bucket; HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) { this.timer = timer; this.task = task; this.deadline = deadline; } @Override public Timer timer() { return timer; } @Override public TimerTask task() { return task; } @Override public boolean cancel() { // only update the state it will be removed from HashedWheelBucket on next tick. if (!compareAndSetState(ST_INIT, ST_CANCELLED)) { return false; } // If a task should be canceled we put this to another queue which will be processed on each tick. // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible. timer.cancelledTimeouts.add(this); return true; } void remove() { HashedWheelBucket bucket = this.bucket; if (bucket != null) { bucket.remove(this); } else { timer.pendingTimeouts.decrementAndGet(); } } public boolean compareAndSetState(int expected, int state) { return STATE_UPDATER.compareAndSet(this, expected, state); } public int state() { return state; } @Override public boolean isCancelled() { return state() == ST_CANCELLED; } @Override public boolean isExpired() { return state() == ST_EXPIRED; } public void expire() { if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; } try { timer.taskExecutor.execute(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName() + " for execution.", t); } } } @Override public void run() { try { task.run(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t); } } } @Override public String toString() { final long currentTime = System.nanoTime(); long remaining = deadline - currentTime + timer.startTime; StringBuilder buf = new StringBuilder(192) .append(simpleClassName(this)) .append('(') .append("deadline: "); if (remaining > 0) { buf.append(remaining) .append(" ns later"); } else if (remaining < 0) { buf.append(-remaining) .append(" ns ago"); } else { buf.append("now"); } if (isCancelled()) { buf.append(", cancelled"); } return buf.append(", task: ") .append(task()) .append(')') .toString(); } } /** * 存放 HashedWheelTimeouts 的桶, * 這些數據存儲在一個類似於鏈表的數據結構中,允許輕松刪除中間的hashedwheeltimeout。HashedWheelTimeout本身作為節點,因此不需要創建額外的對象。 * 保存頭結點和尾節點,方便於任務的提取和插入 */ private static final class HashedWheelBucket { // 頭結點 private HashedWheelTimeout head; // 尾節點 private HashedWheelTimeout tail; /** * Add {@link HashedWheelTimeout} to this bucket. */ public void addTimeout(HashedWheelTimeout timeout) { assert timeout.bucket == null; timeout.bucket = this; if (head == null) { head = tail = timeout; } else { tail.next = timeout; timeout.prev = tail; tail = timeout; } } /** * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}. */ public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; // 遍歷當前時間槽中的所有任務 while (timeout != null) { HashedWheelTimeout next = timeout.next; if (timeout.remainingRounds <= 0) { // 從鏈表中移除 next = remove(timeout); if (timeout.deadline <= deadline) { // 延時任務到期,執行延時任務 timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } // 如果延時任務取消,從鏈表中移除 } else if (timeout.isCancelled()) { next = remove(timeout); } else { // 任務還沒到期,剩余的輪數-1 timeout.remainingRounds --; } // 將指針放置到下一個延時任務上 timeout = next; } } /** * 刪除槽中鏈表中的延時任務 */ public HashedWheelTimeout remove(HashedWheelTimeout timeout) { HashedWheelTimeout next = timeout.next; // remove timeout that was either processed or cancelled by updating the linked-list if (timeout.prev != null) { timeout.prev.next = next; } if (timeout.next != null) { timeout.next.prev = timeout.prev; } if (timeout == head) { // if timeout is also the tail we need to adjust the entry too if (timeout == tail) { tail = null; head = null; } else { head = next; } } else if (timeout == tail) { // if the timeout is the tail modify the tail to be the prev node. tail = timeout.prev; } // null out prev, next and bucket to allow for GC. timeout.prev = null; timeout.next = null; timeout.bucket = null; timeout.timer.pendingTimeouts.decrementAndGet(); return next; } /** * Clear this bucket and return all not expired / cancelled {@link Timeout}s. */ public void clearTimeouts(Set<Timeout> set) { for (;;) { HashedWheelTimeout timeout = pollTimeout(); if (timeout == null) { return; } if (timeout.isExpired() || timeout.isCancelled()) { continue; } set.add(timeout); } } /** * 頭結點移除 */ private HashedWheelTimeout pollTimeout() { HashedWheelTimeout head = this.head; if (head == null) { return null; } HashedWheelTimeout next = head.next; if (next == null) { tail = this.head = null; } else { this.head = next; next.prev = null; } // null out prev and next to allow for GC. head.next = null; head.prev = null; head.bucket = null; return head; } } }
總結:
①:時間輪的轉動是單線程,時間輪中每個槽內延時任務的執行是線程池(4.1.73.Final)
②:延時任務保存的JVM中,沒有做宕機備份,系統重啟延時任務將會丟失,無法恢復任務重新調度
③:時間輪調度器的時間精度可能不是很高,對於精度要求特別高的調度任務可能不太適合。因為時間輪的精度取決於時間格的跨度大小
④:時間輪指針的轉動是使用sleep的方式來完成等待的
時間輪應用:
①:Dubbo、Netty、Kafka、Redission等中間件都用到了時間輪機制
②:訂單關閉,確認收貨、批量定時數據更新等都可以采用時間輪機制
1、心跳檢測:
心跳機制每隔固定時間發送一個心跳包,來檢測客戶端和服務端的連接狀態,客戶端發送心跳包用來告訴服務器其還正常運行。
如在在Dubbo中,需要有心跳機制來維持Consumer與Provider的長連接,默認的心跳間隔是60s。當Provider在3次心跳時間內沒有收到心跳響應,會關閉連接通道。當Consumer在3次心跳時間內沒有收到心跳響應,會進行重連
在Dubbo的HeaderExchangeClient類中會向時間輪中提交該心跳任務:
①:發送心跳的時間輪
private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL);
②:向時間輪中提交心跳任務
private void startHeartBeatTask(URL url) { // Client的具體實現決定是否啟動該心跳任務 if (!client.canHandleIdle()) { AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); // 計算心跳間隔, 最小間隔不能低於1s int heartbeat = getHeartbeat(url); long heartbeatTick = calculateLeastDuration(heartbeat); // 創建心跳任務 this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat); // 提交到IDLE_CHECK_TIMER這個時間輪中等待執行, 等時間到了時間輪就會去取出該任務進行調度執行 IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS); } }
2、超時處理:
在Dubbo中發起RPC調用時,通常會配置超時時間,當消費者調用服務提供者出現超時進行一定的邏輯處理。那么怎么檢測任務調用超時了呢?我們可以利用定時任務,每次創建一個Future,記錄這個Future的創建時間與超時時間,后台有一個定時任務進行檢測,當Future到達超時時間並且沒有被處理時,就需要對這個Future執行超時邏輯處理
3、Redisson分布式鎖續期:
Redisson看門狗機制,通過時間輪定時給分布式鎖續期
在獲取鎖成功后,Redisson會封裝一個鎖續期延時任務放入到時間輪中,默認10秒檢查一下,用於對獲取到的鎖進行續期,延長持有鎖的時間。
如果業務機器宕機了,那么續期的延時任務失效,也無法續期,鎖會超時釋放。
①:添加續期延時任務
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 這邊newTimeout點進去發現就是往時間輪中提交了一個任務 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } if (res) { // 續期成功后繼續調度, 又往時間輪中放一個續期任務 renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
②:lua續期代碼
protected RFuture<Boolean> renewExpirationAsync(long threadId) { // 通過lua腳本對鎖進行續期 return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
附錄:
注意:spring-boot-start-data-redis包依賴的lettuce包中依賴了netty,使用spring-data-redis時不必額外導入netty包
END.