一、前言
在網絡通信中管理上萬的連接,每個連接都有超時任務,如果為每個任務啟動一個TImer超時器,那么會占用大量資源。為了解決這個問題,可用Netty工具類HashedWheelTimer。
二、HashedWheelTimer原理
1.概論
(學習一個類,最好的方式是看api文檔或源碼的注釋,我下載了Netty源碼)
這個類用來計划執行非精准的I/O超時。可以通過指定每一格的時間間隔來改變執行時間的精確度。在大多數網絡應用中,I/O超時不需要十分准確,因此,默認的時間間隔是100 毫秒,這個值適用於大多數場合。HashedWheelTimer內部結構可以看做是個車輪,簡單來說,就是TimerTask的hashTable的車輪。車輪的size默認是512,可以通過構造函數自己設置這個值。注意,當HashedWheelTimer被實例化啟動后,會創建一個新的線程,因此,你的項目里應該只創建它的唯一一個實例。
(這個類的源自一位教授的論文 Tony Lauck's paper。算法是代碼的靈魂,最難的是算法)
2.結構
下方圖示闡述了大致的結構模型
Demo
1 @Test(timeout = 50000000) 2 public void testTimerOverflowWheelLength() throws InterruptedException { 3 final HashedWheelTimer timer = new HashedWheelTimer( 4 Executors.defaultThreadFactory(), 100, TimeUnit.MILLISECONDS, 32); 6 7 timer.newTimeout(new TimerTask() { 8 @Override 9 public void run(final Timeout timeout) throws Exception { 10 System.out.println("lee"); //打印名字 13 } 14 }, 1000, TimeUnit.MILLISECONDS); 15 16 Thread.sleep(10000); 18 assertFalse(timer.stop().isEmpty()); 19 }
通過上面demo,我再說明一下hashedWheelTimer類的構造。
超時任務1000毫秒,超時之后,由hashedWheelTimer類中的worker線程,執行超時之后的任務(打印名字)。hashedWheelTimer有32個槽(相當於一個圓的32分之一),每移動一個槽的時間是100毫秒。
任務需要經過的tick數為: 1000 / 100 = 10次 (等待時長 / tickDuration)
任務需要經過的輪數為 : 10次 / 32次/輪 = 0輪 (tick總次數 / ticksPerWheel)
因為任務超時后不能馬上被worker線程執行,需要等到worker線程移到相應卡槽位置時才會執行,因此說執行時間不精確。
hashedWheelTimer的核心是Worker線程,主要負責每過tickDuration時間就累加一次tick. 同時, 也負責執行到期的timeout任務, 此外,還負責添加timeou任務到指定的wheel中。
接下看看源碼部分。
構造器
構造器的各個參數的說明和注釋十分詳細,應該沒有不好理解的地方。
1 /** 2 * Creates a new timer. 3 * 4 * @param threadFactory a {@link ThreadFactory} that creates a 5 * background {@link Thread} which is dedicated to 用來創建后台線程 6 * {@link TimerTask} execution. 7 * @param tickDuration the duration between tick 每格時間間隔 默認 100 8 * @param unit the time unit of the {@code tickDuration} 時間單位 默認 毫秒 9 * @param ticksPerWheel the size of the wheel 輪子size(一圈多少格)
默認 512 如果不是2的N次方,則去大於該參數的第一個2的N次方
理由 便於Hash值的計算10 * @param leakDetection {@code true} if leak detection should be enabled always, 11 * if false it will only be enabled if the worker thread is not 12 * a daemon thread. 13 * @param maxPendingTimeouts The maximum number of pending timeouts after which call to 14 * {@code newTimeout} will result in 15 * {@link java.util.concurrent.RejectedExecutionException} 16 * being thrown. No maximum pending timeouts limit is assumed if 17 * this value is 0 or negative. 最大待定超時時間 默認 不設置 18 * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null} 19 * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0 20 */ 21 public HashedWheelTimer( 22 ThreadFactory threadFactory, 23 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, 24 long maxPendingTimeouts) {
...
38 39 // Normalize ticksPerWheel to power of two and initialize the wheel. 40 wheel = createWheel(ticksPerWheel); 41 mask = wheel.length - 1; 42 43 // Convert tickDuration to nanos. 44 long duration = unit.toNanos(tickDuration);
...52 53 if (duration < MILLISECOND_NANOS) { 54 if (logger.isWarnEnabled()) { 55 logger.warn("Configured tickDuration %d smaller then %d, using 1ms.", 56 tickDuration, MILLISECOND_NANOS); 57 } 58 this.tickDuration = MILLISECOND_NANOS; 59 } else { 60 this.tickDuration = duration; 61 } 62 // 這里創建worker線程,worker線程是重點。 63 workerThread = threadFactory.newThread(worker); 64 65 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; 66 67 this.maxPendingTimeouts = maxPendingTimeouts; 68 69 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && 70 WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { 71 reportTooManyInstances(); 72 } 73 }
我們接下來看newTimeout()方法
1 @Override 2 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { ... 9 10 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); 11 12 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { 13 pendingTimeouts.decrementAndGet(); 14 throw new RejectedExecutionException("Number of pending timeouts (" 15 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " 16 + "timeouts (" + maxPendingTimeouts + ")"); 17 } 18 19 start(); 20 21 // Add the timeout to the timeout queue which will be processed on the next tick. 22 // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. 23 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; 24 25 // Guard against overflow. 26 if (delay > 0 && deadline < 0) { 27 deadline = Long.MAX_VALUE; 28 } 29 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); 30 timeouts.add(timeout); 31 return timeout; 32 }
我們接着進到worker線程的啟動:
1 /** 2 * Starts the background thread explicitly. The background thread will 3 * start automatically on demand even if you did not call this method. 4 * 5 * @throws IllegalStateException if this timer has been 6 * {@linkplain #stop() stopped} already 7 */ 8 public void start() { // 這個方法為什么是public的?我們可以在實例化HashedWheelTimer后,主動調用這個方法,啟動worker線程 9 switch (WORKER_STATE_UPDATER.get(this)) { 10 case WORKER_STATE_INIT: 11 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { 12 workerThread.start(); // 在這里啟動了worker線程 13 } 14 break; 15 case WORKER_STATE_STARTED: 16 break; 17 case WORKER_STATE_SHUTDOWN: 18 throw new IllegalStateException("cannot be started once stopped"); 19 default: 20 throw new Error("Invalid WorkerState"); 21 } 22 23 // Wait until the startTime is initialized by the worker.
// 為了等待worker線程初始化startTime 24 while (startTime == 0) { 25 try { 26 startTimeInitialized.await(); 27 } catch (InterruptedException ignore) { 28 // Ignore - it will be ready very soon. 29 } 30 } 31 }
下一步,我們看看worker線程里面的操作。
首先是初始化startTime,CountDownLatch的觸發,后面do while操作可以看作是圓盤在一個個轉動,每轉一個就會用worker線程處理,格子中超時的任務。
1 @Override 2 public void run() { 3 // Initialize the startTime. 4 startTime = System.nanoTime(); 5 if (startTime == 0) { 6 // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. 7 startTime = 1; 8 } 9 10 // Notify the other threads waiting for the initialization at start(). 11 startTimeInitialized.countDown(); 12 13 do { 14 final long deadline = waitForNextTick(); 15 if (deadline > 0) { 16 int idx = (int) (tick & mask); 17 processCancelledTasks(); 18 HashedWheelBucket bucket = 19 wheel[idx]; 20 transferTimeoutsToBuckets(); 21 bucket.expireTimeouts(deadline); 22 tick++; 23 } 24 } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); 25 26 // Fill the unprocessedTimeouts so we can return them from stop() method. 27 for (HashedWheelBucket bucket: wheel) { 28 bucket.clearTimeouts(unprocessedTimeouts); 29 } 30 for (;;) { 31 HashedWheelTimeout timeout = timeouts.poll(); 32 if (timeout == null) { 33 break; 34 } 35 if (!timeout.isCancelled()) { 36 unprocessedTimeouts.add(timeout); 37 } 38 } 39 processCancelledTasks(); 40 }
----------------------------------------------------------------------------------------------
參考資源:
https://www.jianshu.com/p/328f22432638
https://my.oschina.net/haogrgr/blog/489320
https://chuansongme.com/n/1650380646616