kafka時間輪的原理(一)


概述

早就想寫關於kafka時間輪的隨筆了,奈何時間不夠,技術感覺理解不到位,現在把我之前學習到的進行整理一下,以便於以后並不會忘卻。kafka時間輪是一個時間延時調度的工具,學習它可以掌握更加靈活先進的定時器技術,補益多多。本文由淺到深進行講解,先講解定時器基礎以及常用定時器,接着就是主要的kafka時間輪實現。大部分都是原理。后期作者寫第二部分的時候專門講解時間輪的實踐和使用。

定時器概念

使用場景,例如:

1,使用tcp連接的時候,客戶端需要向服務端發送心跳請求。

2,財務系統每月生成的定時賬單。

3,雙十一定時開啟秒殺開關。

總而言之就是定時器的作用就是指定特定時刻執行任務,一般定時任務的形式表現為:經過固定時間后觸發、按照固定頻率周期性觸發、在某個時刻觸發。定時器是什么?可以理解為這樣一個數據結構:

存儲一系列任務的集合,並且deadline越接近的任務,擁有越高的執行優先級。

NewTask:將新任務加入任務集合

Cancel:取消某個任務 在任務調度的視角還要支持:

Run:執行一個到底的定時任務

判斷一個任務是否到期,基本會采用輪詢的方式,每隔一個時間片去檢查最近的任務是否到期,並且,在 NewTask 和 Cancel 的行為發生之后,任務調度策略也會出現調整。這么說來定時器就是依靠線程輪詢來實現的。

定時器幾種數據結構

我們主要衡量 NewTask(新增任務),Cancel(取消任務),Run(執行到期的定時任務)這三個指標,分析他們使用不同數據結構的時間/空間復雜度。

雙向有序鏈表

在 Java 中, LinkedList 是一個天然的雙向鏈表。

NewTask:O(N)

Cancel:O(1)

Run:O(1)

N:任務數

NewTask O(N) 很容易理解,按照 expireTime 查找合適的位置即可;Cancel O(1) ,任務在 Cancel 時,會持有自己節點的引用,所以不需要查找其在鏈表中所在的位置,即可實現當前節點的刪除,這也是為什么我們使用雙向鏈表而不是普通鏈表的原因是 ;Run O(1),由於整個雙向鏈表是基於 expireTime 有序的,所以調度器只需要輪詢第一個任務即可。

在 Java 中, PriorityQueue 是一個天然的堆,可以利用傳入的 Comparator 來決定其中元素的優先級。

NewTask:O(logN)

Cancel:O(logN)

Run:O(1)

N:任務數

expireTime 是 Comparator 的對比參數。NewTask O(logN) 和 Cancel O(logN) 分別對應堆插入和刪除元素的時間復雜度 ;Run O(1),由 expireTime 形成的小根堆,我們總能在堆頂找到最快的即將過期的任務。堆與雙向有序鏈表相比,NewTask 和 Cancel 形成了 trade off,但考慮到現實中,定時任務取消的場景並不是很多,所以堆實現的定時器要比雙向有序鏈表優秀。

時間輪

Netty 針對 I/O 超時調度的場景進行了優化,實現了 HashedWheelTimer 時間輪算法。

HashedWheelTimer 是一個環形結構,可以用時鍾來類比,鍾面上有很多 bucket ,每一個 bucket 上可以存放多個任務,使用一個 List 保存該時刻到期的所有任務,同時一個指針隨着時間流逝一格一格轉動,並執行對應 bucket 上所有到期的任務。任務通過 取模決定應該放入哪個 bucket 。和 HashMap 的原理類似,newTask 對應 put,使用 List 來解決 Hash 沖突。

以上圖為例,假設一個 bucket 是 1 秒,則指針轉動一輪表示的時間段為 8s,假設當前指針指向 0,此時需要調度一個 3s 后執行的任務,顯然應該加入到 (0+3=3) 的方格中,指針再走 3 次就可以執行了;如果任務要在 10s 后執行,應該等指針走完一輪零 2 格再執行,因此應放入 2,同時將 round(1)保存到任務中。檢查到期任務時只執行 round 為 0 的, bucket 上其他任務的 round 減 1。

再看圖中的 bucket5,我們可以知道在 $18+5=13s$ 后,有兩個任務需要執行,在 $28+5=21s$ 后有一個任務需要執行。

NewTask:O(1)

Cancel:O(1)

Run:O(M)

Tick:O(1)

M:bucket ,M ~ N/C ,其中 C 為單輪 bucket 數,Netty 中默認為 512

時間輪算法的復雜度可能表達有誤,我個人覺得比較難算,僅供參考。另外,其復雜度還受到多個任務分配到同一個 bucket 的影響。並且多了一個轉動指針的開銷。傳統定時器是面向任務的,時間輪定時器是面向 bucket 的。

構造 Netty 的 HashedWheelTimer 時有兩個重要的參數: tickDuration 和 ticksPerWheel。

1,tickDuration:即一個 bucket 代表的時間,默認為 100ms,Netty 認為大多數場景下不需要修改這個參數;

2,ticksPerWheel:一輪含有多少個 bucket ,默認為 512 個,如果任務較多可以增大這個參數,降低任務分配到同一個 bucket 的概率。

層級時間輪

說道今天的重點了,Kafka 針對時間輪算法進行了優化,實現了層級時間輪 TimingWheel。先簡單介紹一下,層級時間輪就是相當於時針分針秒針,秒針轉動一圈,分針就走了一個bucket。層級適合時間跨度較大時存在明顯優勢。

如果任務的時間跨度很大,數量也多,傳統的 HashedWheelTimer 會造成任務的 round 很大,單個 bucket 的任務 List 很長,並會維持很長一段時間。這時可將輪盤按時間粒度分級。下面會詳細講解kafka層級時間輪的原理。

KAFKA時間輪模式

“時間輪”的概念稍微有點抽象,我用一個生活中的例子,來幫助你建立一些初始印象。想想我們生活中的手表。手表由時針、分針和秒針組成,它們各自有獨立的刻度,但又彼此相關:秒針轉動一圈,分針會向前推進一格;分針轉動一圈,時針會向前推進一格。這就是典型的分層時間輪。和手表不太一樣的是,Kafka 自己有專門的術語。在 Kafka 中,手表中的“一格”叫“一個桶(Bucket)”,而“推進”對應於 Kafka 中的“滴答”,也就是 tick。后面你在閱讀源碼的時候,會頻繁地看到 Bucket、tick 字眼,你可以把它們理解成手表刻度盤面上的“一格”和“向前推進”的意思。除此之外,每個 Bucket 下也不是白板一塊,它實際上是一個雙向循環鏈表(Doubly Linked Cyclic List),里面保存了一組延時請求。我先用一張圖幫你理解下雙向循環鏈表。

圖中的每個節點都有一個 next 和 prev 指針,分別指向下一個元素和上一個元素。Root 是鏈表的頭部節點,不包含任何實際數據。它的 next 指針指向鏈表的第一個元素,而 prev 指針指向最后一個元素。由於是雙向鏈表結構,因此,代碼能夠利用 next 和 prev 兩個指針快速地定位元素,因此,在 Bucket 下插入和刪除一個元素的時間復雜度是 O(1)。當然,雙向鏈表要求同時保存兩個指針數據,在節省時間的同時消耗了更多的空間。在算法領域,這是典型的用空間去換時間的優化思想。

下圖是比較容易理解的時間輪:Kafka中的時間輪(TimingWheel)是一個存儲定時任務的環形隊列,底層采用數組實現,數組中的每個元素可以存放一個定時任務列表(TimerTaskList)。TimerTaskList是一個環形的雙向鏈表,鏈表中的每一項表示的都是定時任務項(TimerTaskEntry),其中封裝了真正的定時任務TimerTask。

在 Kafka 中,具體是怎么應用分層時間輪實現請求隊列的呢?

圖中的時間輪共有兩個層級,分別是 Level 0 和 Level 1。每個時間輪有 8 個 Bucket,每個 Bucket 下是一個雙向循環鏈表,用來保存延遲請求。在 Kafka 源碼中,時間輪對應 utils.timer 包下的 TimingWheel 類,每個 Bucket 下的鏈表對應 TimerTaskList 類,鏈表元素對應 TimerTaskEntry 類,而每個鏈表元素里面保存的延時任務對應 TimerTask。在這些類中,TimerTaskEntry 與 TimerTask 是 1 對 1 的關系,TimerTaskList 下包含多個 TimerTaskEntry,TimingWheel 包含多個 TimerTaskList。我畫了一張 UML 圖,幫助你理解這些類之間的對應關系。

KAFKA時間輪源碼

TimerTask

所在文件:core/src/main/scala/kafka/utils/timer/TimerTask.scala

這個trait, 繼承於 Runnable,需要放在時間輪里執行的任務都要繼承這個TimerTask

每個TimerTask必須和一個TimerTaskEntry綁定,實現上放到時間輪里的是TimerTaskEntry

def cancel(): 取消當前的Task, 實際是解除在當前TaskEntry上的綁定

1 def cancel(): Unit = {
2     synchronized {
3       if (timerTaskEntry != null) timerTaskEntry.remove()
4       timerTaskEntry = null
5     }
6   }

TimerTaskEntry

所在文件:core/src/main/scala/kafka/utils/timer/TimerTaskList.scala

作用:綁定一個TimerTask對象,然后被加入到一個TimerTaskLIst中;

它是TimerTaskList這個雙向列表 中的元素,因此有如下三個成員

1   var list: TimerTaskList = null //屬於哪一個TimerTaskList
2   var next: TimerTaskEntry = null //指向其后一個元素 
3   var prev: TimerTaskEntry = null //指向其前一個元素

TimerTaskEntry對象在構造成需要一個TimerTask對象,並且調用

1 timerTask.setTimerTaskEntry(this)

TimerTask對象綁定到 TimerTaskEntry上 如果這個TimerTask對象之前已經綁定到了一個TimerTaskEntry上, 先調用timerTaskEntry.remove()解除綁定。 * def remove()

實際上就是把自己從當前所在TimerTaskList上摘掉, 為什么要使用一個while(...)來作,簡單說就是相當於用個自旋鎖代替讀寫鎖來盡量保證這個remove的操作的徹底。

TimerTaskList

所在文件:core/src/main/scala/kafka/utils/timer/TimerTaskList.scala

作為時間輪上的一個bucket, 是一個有頭指針的雙向鏈表

雙向鏈表結構:

1 private[this] val root = new TimerTaskEntry(null)
2   root.next = root
3   root.prev = root

繼承於java的Delayed,說明這個對象應該是要被放入javar的DelayQueue,自然要實現下面的兩個接口。

 1 def getDelay(unit: TimeUnit): Long = {
 2     unit.convert(max(getExpiration - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS)
 3   }
 4 
 5   def compareTo(d: Delayed): Int = {
 6 
 7     val other = d.asInstanceOf[TimerTaskList]
 8 
 9     if(getExpiration < other.getExpiration) -1
10     else if(getExpiration > other.getExpiration) 1
11     else 0
12   }

每個 TimerTaskList都是時間輪上的一個bucket,自然也要關聯一個過期 時間:private[this] val expiration = new AtomicLong(-1L)

addremove方法,用來添加和刪除TimerTaskEntry

foreach方法:在鏈表的每個元素上應用給定的函數;

flush方法:在鏈表的每個元素上應用給定的函數,並清空整個鏈表, 同時超時時間也設置為-1;

TimingWheel

所在文件:core/src/main/scala/kafka/utils/timer/TimingWheel.scala

tickMs:表示一個槽所代表的時間范圍,kafka的默認值的1ms

wheelSize:表示該時間輪有多少個槽,kafka的默認值是20

startMs:表示該時間輪的開始時間

taskCounter:表示該時間輪的任務總數

queue:是一個TimerTaskList的延遲隊列。每個槽都有它一個對應的TimerTaskList,TimerTaskList是一個雙向鏈表,有一個expireTime的值,這些TimerTaskList都被加到這個延遲隊列中,expireTime最小的槽會排在隊列的最前面。

interval:時間輪所能表示的時間跨度,也就是tickMs*wheelSize

buckets:表示TimerTaskList的數組,即各個槽。

currentTime:表示當前時間,也就是時間輪指針指向的時間

上面說了這么多,終於到這個時間輪出場了,說簡單也簡單,說復雜也復雜;

簡言之,就是根據每個TimerTaskEntry的過期時間和當前時間輪的時間,選擇一個合適的bucket(實際上就是TimerTaskList),這個桶的超時時間相同(會去余留整), 把這個TimerTaskEntry對象放進去,如果當前的bucket因超時被DelayQueue隊列poll出來的話, 以為着這個bucket里面的都過期, 會調用這個bucketflush方法, 將里面的entry都再次add一次,在這個add里因task已過期,將被立即提交執行,同時reset這個bucket的過期時間, 這樣它就可以用來裝入新的task了, 感謝我的同事"闊哥"的批評指正.

這個時間輪是支持層級的,就是如果當前放入的TimerTaskEntry的過期時間如果超出了當前層級時間輪的覆蓋范圍,那么就創始一個overflowWheel: TimingWheel,放進去,只不過這個新的時間輪的降低了很多,那的tick是老時間輪的interval(相當於老時間輪的tick * wheelSize), 基本可以類比成鍾表的分針和時針;

def add(timerTaskEntry: TimerTaskEntry): Boolean: 將TimerTaskEntry加入適當的TimerTaskList;

def advanceClock(timeMs: Long)::推動時間輪向前走,更新CurrentTime

值得注意的是,這個類不是線程安全的,也就是說add方法和advanceClock的調用方式使用者要來保證;

關於這個層級時間輪的原理,源碼里有詳細的說明。

Timer

所在文件:core/src/main/scala/kafka/utils/timer/Timer.scala

上面講了這么多,現在是時候把這些組裝起來了,這就是個用TimingWheel實現的定時器,可以添加任務,任務可以取消,可以到期被執行;

構造一個TimingWheel。

1 private[this] val delayQueue = new DelayQueue[TimerTaskList]()
2   private[this] val taskCounter = new AtomicInteger(0)
3   private[this] val timingWheel = new TimingWheel(
4     tickMs = tickMs,
5     wheelSize = wheelSize,
6     startMs = startMs,
7     taskCounter = taskCounter,
8     delayQueue
9   )

taskExecutor: ExecutorService: 用於執行具體的task;

這個類為線程安全類,因為TimingWhell本身不是線程安全,所以對其操作需要加鎖。

1 // Locks used to protect data structures while ticking
2   private[this] val readWriteLock = new ReentrantReadWriteLock()
3   private[this] val readLock = readWriteLock.readLock()
4   private[this] val writeLock = readWriteLock.writeLock()

 def add(timerTask: TimerTask)::添加任務到定時器,通過調用def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry):實現

1 if (!timingWheel.add(timerTaskEntry)) {
2       // Already expired or cancelled
3       if (!timerTaskEntry.cancelled)
4         taskExecutor.submit(timerTaskEntry.timerTask)
5     }

timingWheel.add(timerTaskEntry):如果任務已經過期或被取消,則return false; 過期的任務被提交到taskExcutor執行;

 1 var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
 2     if (bucket != null) {
 3       writeLock.lock()
 4       try {
 5         while (bucket != null) {
 6           timingWheel.advanceClock(bucket.getExpiration())
 7           bucket.flush(reinsert)
 8           bucket = delayQueue.poll()
 9         }
10       } finally {
11         writeLock.unlock()
12       }
13       true
14     } else {
15       false
16     }

delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) 獲取到期的bucket;

調用timingWheel.advanceClock(bucket.getExpiration())

bucket.flush(reinsert):對bucket中的每一個TimerEntry調用reinsert, 實際上是調用addTimerTaskEntry(timerTaskEntry), 此時到期的Task會被執行;

總結

感謝網絡大神的分享:

https://zhuanlan.zhihu.com/p/51405974

https://mp.weixin.qq.com/s?__biz=MjM5NzMyMjAwMA==&mid=2651485083&idx=1&sn=089a76c2ccef0a98831a389fd7943d23&chksm=bd251fe48a5296f22df28cd53827eac4cb153855b70266ce753cdb0ee6d145782e5a664ebd9a&mpshare=1&scene=1&srcid=1004BPAgMnZs9xgVCwrXFMog&sharer_sharetime=1591491757235&sharer_shareid=d40e8d2bb00008844e69867bcfc0d895#rd

https://www.jianshu.com/p/0f0fec47a0ad

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM