概述
早就想寫關於kafka時間輪的隨筆了,奈何時間不夠,技術感覺理解不到位,現在把我之前學習到的進行整理一下,以便於以后並不會忘卻。kafka時間輪是一個時間延時調度的工具,學習它可以掌握更加靈活先進的定時器技術,補益多多。本文由淺到深進行講解,先講解定時器基礎以及常用定時器,接着就是主要的kafka時間輪實現。大部分都是原理。后期作者寫第二部分的時候專門講解時間輪的實踐和使用。
定時器概念
使用場景,例如:
1,使用tcp連接的時候,客戶端需要向服務端發送心跳請求。
2,財務系統每月生成的定時賬單。
3,雙十一定時開啟秒殺開關。
總而言之就是定時器的作用就是指定特定時刻執行任務,一般定時任務的形式表現為:經過固定時間后觸發、按照固定頻率周期性觸發、在某個時刻觸發。定時器是什么?可以理解為這樣一個數據結構:
存儲一系列任務的集合,並且deadline越接近的任務,擁有越高的執行優先級。
NewTask:將新任務加入任務集合
Cancel:取消某個任務 在任務調度的視角還要支持:
Run:執行一個到底的定時任務
判斷一個任務是否到期,基本會采用輪詢的方式,每隔一個時間片去檢查最近的任務是否到期,並且,在 NewTask 和 Cancel 的行為發生之后,任務調度策略也會出現調整。這么說來定時器就是依靠線程輪詢來實現的。
定時器幾種數據結構
我們主要衡量 NewTask(新增任務),Cancel(取消任務),Run(執行到期的定時任務)這三個指標,分析他們使用不同數據結構的時間/空間復雜度。
雙向有序鏈表
在 Java 中, LinkedList 是一個天然的雙向鏈表。
NewTask:O(N)
Cancel:O(1)
Run:O(1)
N:任務數
NewTask O(N) 很容易理解,按照 expireTime 查找合適的位置即可;Cancel O(1) ,任務在 Cancel 時,會持有自己節點的引用,所以不需要查找其在鏈表中所在的位置,即可實現當前節點的刪除,這也是為什么我們使用雙向鏈表而不是普通鏈表的原因是 ;Run O(1),由於整個雙向鏈表是基於 expireTime 有序的,所以調度器只需要輪詢第一個任務即可。
堆
在 Java 中, PriorityQueue 是一個天然的堆,可以利用傳入的 Comparator 來決定其中元素的優先級。
NewTask:O(logN)
Cancel:O(logN)
Run:O(1)
N:任務數
expireTime 是 Comparator 的對比參數。NewTask O(logN) 和 Cancel O(logN) 分別對應堆插入和刪除元素的時間復雜度 ;Run O(1),由 expireTime 形成的小根堆,我們總能在堆頂找到最快的即將過期的任務。堆與雙向有序鏈表相比,NewTask 和 Cancel 形成了 trade off,但考慮到現實中,定時任務取消的場景並不是很多,所以堆實現的定時器要比雙向有序鏈表優秀。
時間輪
Netty 針對 I/O 超時調度的場景進行了優化,實現了 HashedWheelTimer 時間輪算法。
HashedWheelTimer 是一個環形結構,可以用時鍾來類比,鍾面上有很多 bucket ,每一個 bucket 上可以存放多個任務,使用一個 List 保存該時刻到期的所有任務,同時一個指針隨着時間流逝一格一格轉動,並執行對應 bucket 上所有到期的任務。任務通過 取模決定應該放入哪個 bucket 。和 HashMap 的原理類似,newTask 對應 put,使用 List 來解決 Hash 沖突。
以上圖為例,假設一個 bucket 是 1 秒,則指針轉動一輪表示的時間段為 8s,假設當前指針指向 0,此時需要調度一個 3s 后執行的任務,顯然應該加入到 (0+3=3) 的方格中,指針再走 3 次就可以執行了;如果任務要在 10s 后執行,應該等指針走完一輪零 2 格再執行,因此應放入 2,同時將 round(1)保存到任務中。檢查到期任務時只執行 round 為 0 的, bucket 上其他任務的 round 減 1。
再看圖中的 bucket5,我們可以知道在 $18+5=13s$ 后,有兩個任務需要執行,在 $28+5=21s$ 后有一個任務需要執行。
NewTask:O(1)
Cancel:O(1)
Run:O(M)
Tick:O(1)
M:bucket ,M ~ N/C ,其中 C 為單輪 bucket 數,Netty 中默認為 512
時間輪算法的復雜度可能表達有誤,我個人覺得比較難算,僅供參考。另外,其復雜度還受到多個任務分配到同一個 bucket 的影響。並且多了一個轉動指針的開銷。傳統定時器是面向任務的,時間輪定時器是面向 bucket 的。
構造 Netty 的 HashedWheelTimer 時有兩個重要的參數: tickDuration 和 ticksPerWheel。
1,tickDuration:即一個 bucket 代表的時間,默認為 100ms,Netty 認為大多數場景下不需要修改這個參數;
2,ticksPerWheel:一輪含有多少個 bucket ,默認為 512 個,如果任務較多可以增大這個參數,降低任務分配到同一個 bucket 的概率。
層級時間輪
說道今天的重點了,Kafka 針對時間輪算法進行了優化,實現了層級時間輪 TimingWheel。先簡單介紹一下,層級時間輪就是相當於時針分針秒針,秒針轉動一圈,分針就走了一個bucket。層級適合時間跨度較大時存在明顯優勢。
如果任務的時間跨度很大,數量也多,傳統的 HashedWheelTimer 會造成任務的 round 很大,單個 bucket 的任務 List 很長,並會維持很長一段時間。這時可將輪盤按時間粒度分級。下面會詳細講解kafka層級時間輪的原理。
KAFKA時間輪模式
“時間輪”的概念稍微有點抽象,我用一個生活中的例子,來幫助你建立一些初始印象。想想我們生活中的手表。手表由時針、分針和秒針組成,它們各自有獨立的刻度,但又彼此相關:秒針轉動一圈,分針會向前推進一格;分針轉動一圈,時針會向前推進一格。這就是典型的分層時間輪。和手表不太一樣的是,Kafka 自己有專門的術語。在 Kafka 中,手表中的“一格”叫“一個桶(Bucket)”,而“推進”對應於 Kafka 中的“滴答”,也就是 tick。后面你在閱讀源碼的時候,會頻繁地看到 Bucket、tick 字眼,你可以把它們理解成手表刻度盤面上的“一格”和“向前推進”的意思。除此之外,每個 Bucket 下也不是白板一塊,它實際上是一個雙向循環鏈表(Doubly Linked Cyclic List),里面保存了一組延時請求。我先用一張圖幫你理解下雙向循環鏈表。
圖中的每個節點都有一個 next 和 prev 指針,分別指向下一個元素和上一個元素。Root 是鏈表的頭部節點,不包含任何實際數據。它的 next 指針指向鏈表的第一個元素,而 prev 指針指向最后一個元素。由於是雙向鏈表結構,因此,代碼能夠利用 next 和 prev 兩個指針快速地定位元素,因此,在 Bucket 下插入和刪除一個元素的時間復雜度是 O(1)。當然,雙向鏈表要求同時保存兩個指針數據,在節省時間的同時消耗了更多的空間。在算法領域,這是典型的用空間去換時間的優化思想。
下圖是比較容易理解的時間輪:Kafka中的時間輪(TimingWheel)是一個存儲定時任務的環形隊列,底層采用數組實現,數組中的每個元素可以存放一個定時任務列表(TimerTaskList)。TimerTaskList是一個環形的雙向鏈表,鏈表中的每一項表示的都是定時任務項(TimerTaskEntry),其中封裝了真正的定時任務TimerTask。
在 Kafka 中,具體是怎么應用分層時間輪實現請求隊列的呢?
圖中的時間輪共有兩個層級,分別是 Level 0 和 Level 1。每個時間輪有 8 個 Bucket,每個 Bucket 下是一個雙向循環鏈表,用來保存延遲請求。在 Kafka 源碼中,時間輪對應 utils.timer 包下的 TimingWheel 類,每個 Bucket 下的鏈表對應 TimerTaskList 類,鏈表元素對應 TimerTaskEntry 類,而每個鏈表元素里面保存的延時任務對應 TimerTask。在這些類中,TimerTaskEntry 與 TimerTask 是 1 對 1 的關系,TimerTaskList 下包含多個 TimerTaskEntry,TimingWheel 包含多個 TimerTaskList。我畫了一張 UML 圖,幫助你理解這些類之間的對應關系。
KAFKA時間輪源碼
TimerTask
所在文件:core/src/main/scala/kafka/utils/timer/TimerTask.scala
這個trait, 繼承於 Runnable
,需要放在時間輪里執行的任務都要繼承這個TimerTask
每個TimerTask
必須和一個TimerTaskEntry
綁定,實現上放到時間輪里的是TimerTaskEntry
def cancel()
: 取消當前的Task, 實際是解除在當前TaskEntry
上的綁定
1 def cancel(): Unit = { 2 synchronized { 3 if (timerTaskEntry != null) timerTaskEntry.remove() 4 timerTaskEntry = null 5 } 6 }
TimerTaskEntry
所在文件:core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
作用:綁定一個TimerTask
對象,然后被加入到一個TimerTaskLIst
中;
它是TimerTaskList
這個雙向列表 中的元素,因此有如下三個成員
1 var list: TimerTaskList = null //屬於哪一個TimerTaskList 2 var next: TimerTaskEntry = null //指向其后一個元素 3 var prev: TimerTaskEntry = null //指向其前一個元素
TimerTaskEntry
對象在構造成需要一個TimerTask
對象,並且調用
1 timerTask.setTimerTaskEntry(this)
將TimerTask
對象綁定到 TimerTaskEntry
上 如果這個TimerTask
對象之前已經綁定到了一個TimerTaskEntry
上, 先調用timerTaskEntry.remove()
解除綁定。 * def remove()
實際上就是把自己從當前所在TimerTaskList
上摘掉, 為什么要使用一個while(...)
來作,簡單說就是相當於用個自旋鎖代替讀寫鎖來盡量保證這個remove的操作的徹底。
TimerTaskList
所在文件:core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
作為時間輪上的一個bucket, 是一個有頭指針的雙向鏈表
雙向鏈表結構:
1 private[this] val root = new TimerTaskEntry(null) 2 root.next = root 3 root.prev = root
繼承於java的Delayed,說明這個對象應該是要被放入javar的DelayQueue,自然要實現下面的兩個接口。
1 def getDelay(unit: TimeUnit): Long = { 2 unit.convert(max(getExpiration - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS) 3 } 4 5 def compareTo(d: Delayed): Int = { 6 7 val other = d.asInstanceOf[TimerTaskList] 8 9 if(getExpiration < other.getExpiration) -1 10 else if(getExpiration > other.getExpiration) 1 11 else 0 12 }
每個 TimerTaskList
都是時間輪上的一個bucket,自然也要關聯一個過期 時間:private[this] val expiration = new AtomicLong(-1L)
add
和remove
方法,用來添加和刪除TimerTaskEntry
foreach
方法:在鏈表的每個元素上應用給定的函數;
flush
方法:在鏈表的每個元素上應用給定的函數,並清空整個鏈表, 同時超時時間也設置為-1;
TimingWheel
所在文件:core/src/main/scala/kafka/utils/timer/TimingWheel.scala
tickMs:表示一個槽所代表的時間范圍,kafka的默認值的1ms
wheelSize:表示該時間輪有多少個槽,kafka的默認值是20
startMs:表示該時間輪的開始時間
taskCounter:表示該時間輪的任務總數
queue:是一個TimerTaskList的延遲隊列。每個槽都有它一個對應的TimerTaskList,TimerTaskList是一個雙向鏈表,有一個expireTime的值,這些TimerTaskList都被加到這個延遲隊列中,expireTime最小的槽會排在隊列的最前面。
interval:時間輪所能表示的時間跨度,也就是tickMs*wheelSize
buckets:表示TimerTaskList的數組,即各個槽。
currentTime:表示當前時間,也就是時間輪指針指向的時間
上面說了這么多,終於到這個時間輪出場了,說簡單也簡單,說復雜也復雜;
簡言之,就是根據每個TimerTaskEntry
的過期時間和當前時間輪的時間,選擇一個合適的bucket(實際上就是TimerTaskList
),這個桶的超時時間相同(會去余留整), 把這個TimerTaskEntry
對象放進去,如果當前的bucket
因超時被DelayQueue
隊列poll出來的話, 以為着這個bucket
里面的都過期, 會調用這個bucket
的flush
方法, 將里面的entry都再次add一次,在這個add里因task已過期,將被立即提交執行,同時reset這個bucket
的過期時間, 這樣它就可以用來裝入新的task了, 感謝我的同事"闊哥"的批評指正.
這個時間輪是支持層級的,就是如果當前放入的TimerTaskEntry
的過期時間如果超出了當前層級時間輪的覆蓋范圍,那么就創始一個overflowWheel: TimingWheel
,放進去,只不過這個新的時間輪的降低了很多,那的tick是老時間輪的interval(相當於老時間輪的tick * wheelSize), 基本可以類比成鍾表的分針和時針;
def add(timerTaskEntry: TimerTaskEntry): Boolean
: 將TimerTaskEntry
加入適當的TimerTaskList
;
def advanceClock(timeMs: Long):
:推動時間輪向前走,更新CurrentTime
值得注意的是,這個類不是線程安全的,也就是說add
方法和advanceClock
的調用方式使用者要來保證;
關於這個層級時間輪的原理,源碼里有詳細的說明。
Timer
所在文件:core/src/main/scala/kafka/utils/timer/Timer.scala
上面講了這么多,現在是時候把這些組裝起來了,這就是個用TimingWheel
實現的定時器,可以添加任務,任務可以取消,可以到期被執行;
構造一個TimingWheel。
1 private[this] val delayQueue = new DelayQueue[TimerTaskList]() 2 private[this] val taskCounter = new AtomicInteger(0) 3 private[this] val timingWheel = new TimingWheel( 4 tickMs = tickMs, 5 wheelSize = wheelSize, 6 startMs = startMs, 7 taskCounter = taskCounter, 8 delayQueue 9 )
taskExecutor: ExecutorService
: 用於執行具體的task;
這個類為線程安全類,因為TimingWhell
本身不是線程安全,所以對其操作需要加鎖。
1 // Locks used to protect data structures while ticking 2 private[this] val readWriteLock = new ReentrantReadWriteLock() 3 private[this] val readLock = readWriteLock.readLock() 4 private[this] val writeLock = readWriteLock.writeLock()
def add(timerTask: TimerTask):
:添加任務到定時器,通過調用def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry):
實現
1 if (!timingWheel.add(timerTaskEntry)) { 2 // Already expired or cancelled 3 if (!timerTaskEntry.cancelled) 4 taskExecutor.submit(timerTaskEntry.timerTask) 5 }
timingWheel.add(timerTaskEntry)
:如果任務已經過期或被取消,則return false; 過期的任務被提交到taskExcutor
執行;
1 var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) 2 if (bucket != null) { 3 writeLock.lock() 4 try { 5 while (bucket != null) { 6 timingWheel.advanceClock(bucket.getExpiration()) 7 bucket.flush(reinsert) 8 bucket = delayQueue.poll() 9 } 10 } finally { 11 writeLock.unlock() 12 } 13 true 14 } else { 15 false 16 }
delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
獲取到期的bucket;
調用timingWheel.advanceClock(bucket.getExpiration())
bucket.flush(reinsert)
:對bucket中的每一個TimerEntry
調用reinsert
, 實際上是調用addTimerTaskEntry(timerTaskEntry)
, 此時到期的Task會被執行;
總結
感謝網絡大神的分享:
https://zhuanlan.zhihu.com/p/51405974
https://mp.weixin.qq.com/s?__biz=MjM5NzMyMjAwMA==&mid=2651485083&idx=1&sn=089a76c2ccef0a98831a389fd7943d23&chksm=bd251fe48a5296f22df28cd53827eac4cb153855b70266ce753cdb0ee6d145782e5a664ebd9a&mpshare=1&scene=1&srcid=1004BPAgMnZs9xgVCwrXFMog&sharer_sharetime=1591491757235&sharer_shareid=d40e8d2bb00008844e69867bcfc0d895#rd
https://www.jianshu.com/p/0f0fec47a0ad