解決的問題
當我們有多個消息的生產者線程,一個消費者線程時,他們之間如何進行高並發、線程安全的協調?
很簡單,用一個隊列。
當我們有多個消息的生產者線程,多個消費者線程,並且每一條消息需要被所有的消費者都消費一次(這就不是一般隊列,只消費一次的語義了),該怎么做?
這時仍然需要一個隊列。但是:
1. 每個消費者需要自己維護一個指針,知道自己消費了隊列中多少數據。這樣同一條消息,可以被多個人獨立消費。
2. 隊列需要一個全局指針,指向最后一條被所有生產者加入的消息。消費者在消費數據時,不能消費到這個全局指針之后的位置——因為這個全局指針,已經是代表隊列中最后一條可以被消費的消息了。
3. 需要協調所有消費者,在消費完所有隊列中的消息后,阻塞等待。
4. 如果消費者之間有依賴關系,即對同一條消息的消費順序,在業務上有固定的要求,那么還需要處理誰先消費,誰后消費同一條消息的問題。
總而言之,如果有多個生產者,多個消費者,並且同一條消息要給到所有的消費者都去處理一下,需要做到以上4點。這是不容易的。
LMAX Disruptor,正是這種場景下,滿足以上4點要求的單機跨線程消息傳遞、分發的開源、高性能實現。
這里有一篇英文的Disruptor介紹好文:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction
關鍵概念
1. RingBuffer
應用需要傳遞的消息在Disrutpor中稱為Event(事件)。
RingBuffer是Event的數組,實現了阻塞隊列的語義:
如果RingBuffer滿了,則生產者會阻塞等待。
如果RingBuffer空了,則消費者會阻塞等待。
2. Sequence
在上文中,我提到“每個消費者需要自己維護一個指針”。這里的指針就是一個單調遞增長整數(及其基於CAS的加法、獲取操作),稱為Sequence。
除了每個消費者需要維護一個指針外,RingBuffer自身也要維護一個全局指針(如上一節第2點所提到的),記錄最后一條可以被消費的消息。這個全局指針就在下圖紅框中。
生產場景實現
生產者往RingBuffer中發送一條消息(RingBuffer.publish())時:
1. 生產者的私有sequence會+1
2. 檢查生產者的私有sequence與RingBuffer中Event個數的關系。如果發現Event數組滿了(下圖紅框中的判斷),則阻塞(下圖綠框中的等待)。
3. RingBuffer會在Event數組中(sequencer+1) % BUFFER_SIZE的地方,放入Event。這里的取模操作,就體現了Event數組用到最后,則回到頭部繼續放,所謂“Ring“ Buffer的輪循復用語義。
消費場景實現
消費者從RingBuffer循環隊列中獲取一條消息時:
1. 從消費者私有Sequence,可以知道它自己消費到了RingBuffer隊列中的哪一條消息。
2. 從RingBuffer的全局指針Sequence,可以知道RingBuffer中最后一條沒有被消費的消息在什么位置。
3. N = (RuingBuffer的全局指針Sequence - 消費者私有Sequence),就是當前消費者,還可以消費多少Event。
4. 如果以上差值N為0,說明當前消費者已經消費過RingBuffer中的所有消息了。那么當前消費者會阻塞。等待生產者加入更多的消息:
以上代碼中,紅框中的availableSequence就是RingBuffer的全局指針Sequence。綠框中的sequence是當前消費者的私有sequence。
如果這個判斷為true,說明RingBuffer中最新一條可以被消費的Event,已經被當前消費者消費過了。那么就會調用apployWaitMethod()阻塞,等待生產者產生更多的Event。
5. 如果RingBuffer中,還有可以被當前消費者消費的Event,即N > 0,
那么消費者,會一口氣獲取所有可以被消費的N個Event。即下圖中的while循環,直到N個Event都被消費才退出。這種一口氣消費盡量多的Event,是高性能的體現。
從RingBuffer中每獲取一個Event,都會回調綠框中的eventHandler——這是應用注冊的Event處理方法,執行應用的Event消費業務邏輯。
最后,上圖中的sequence.set(availableSequence),會把當前消費者的私有Sequence更新到RingBuffer的全局Sequence。表示RingBuffer中所有的Event都已經消費掉了。
高性能的實現細節
無鎖
無鎖就沒有鎖競爭。當生產者、消費者線程數很高時,意義重大。所以,
往大里說,每個消費者維護自己的Sequence,基本沒有跨線程共享的狀態。
往小里說,Sequence的加法是CAS實現的。
- 當生產者需要判斷RingBuffer是否已滿時,用CAS比較原先RingBuffer的Event個數,和假定放入新Event后Event的個數。
- 如果CAS返回false,說明在判斷期間,別的生產者加入了新Event;或者別的消費者拿走了Event。那么當前判斷無效,需要重新判斷。這就是常見的 do { ... } while (false == CAS(oldVal, newVal))。——都是套路:)
對象的復用
JVM運行時,一怕創建大對象,二怕創建很多小對象。這都會導致JVM堆碎片化、對象元數據存儲的額外開銷大。這是高性能Java應用的噩夢。
為了解決第二點“很多小對象”,主流開源框架都會自己維護、復用對象池。LMAX Disruptor也不例外。
生產者不是創建新的Event對象,放入到RingBuffer中。而是從RingBuffer中取出一個已有的Event對象,更新它所指向的業務數據,來代表一個邏輯上的新Event。
所以LMAX Disruptor的生產者API,用起來有些麻煩——分為三步,一是下圖綠框中取出一個已有的、已經被所有人消費過的Event對象,二是下圖紅框中更新這個Event對象所指向的業務數據,三是下圖藍框中標記這個Event對象為邏輯上的新Event。
總結
https://github.com/LMAX-Exchange/disruptor/wiki/Introduction 這篇文章對Disruptor基本概念已經介紹得很清楚了。
但是,我覺得,入門介紹結合源碼去咀嚼,才會比較sexy,朋友們會深入理解。其實也不難,關鍵是找出源碼中的核心部分。
篇幅所限,本文對於Disruptor的高級功能沒有解釋,比如處理多個消費者之間的依賴關系。有機會補充。