LMAX Disruptor—多生產者多消費者中,消息復制分發的高性能實現


解決的問題

當我們有多個消息的生產者線程,一個消費者線程時,他們之間如何進行高並發、線程安全的協調?

很簡單,用一個隊列。

 

當我們有多個消息的生產者線程,多個消費者線程,並且每一條消息需要被所有的消費者都消費一次(這就不是一般隊列,只消費一次的語義了),該怎么做?

這時仍然需要一個隊列。但是:

1. 每個消費者需要自己維護一個指針,知道自己消費了隊列中多少數據。這樣同一條消息,可以被多個人獨立消費。

2. 隊列需要一個全局指針,指向最后一條被所有生產者加入的消息。消費者在消費數據時,不能消費到這個全局指針之后的位置——因為這個全局指針,已經是代表隊列中最后一條可以被消費的消息了。

3. 需要協調所有消費者,在消費完所有隊列中的消息后,阻塞等待。

4. 如果消費者之間有依賴關系,即對同一條消息的消費順序,在業務上有固定的要求,那么還需要處理誰先消費,誰后消費同一條消息的問題。

 

總而言之,如果有多個生產者,多個消費者,並且同一條消息要給到所有的消費者都去處理一下,需要做到以上4點。這是不容易的。

LMAX Disruptor,正是這種場景下,滿足以上4點要求的單機跨線程消息傳遞、分發的開源、高性能實現。

這里有一篇英文的Disruptor介紹好文:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction

 

關鍵概念

1. RingBuffer

應用需要傳遞的消息在Disrutpor中稱為Event(事件)。

RingBuffer是Event的數組,實現了阻塞隊列的語義:

如果RingBuffer滿了,則生產者會阻塞等待。

如果RingBuffer空了,則消費者會阻塞等待。

 

2. Sequence

在上文中,我提到“每個消費者需要自己維護一個指針”。這里的指針就是一個單調遞增長整數(及其基於CAS的加法、獲取操作),稱為Sequence。

除了每個消費者需要維護一個指針外,RingBuffer自身也要維護一個全局指針(如上一節第2點所提到的),記錄最后一條可以被消費的消息。這個全局指針就在下圖紅框中。

生產場景實現

生產者往RingBuffer中發送一條消息(RingBuffer.publish())時:

1. 生產者的私有sequence會+1

2. 檢查生產者的私有sequence與RingBuffer中Event個數的關系。如果發現Event數組滿了(下圖紅框中的判斷),則阻塞(下圖綠框中的等待)。

 

3. RingBuffer會在Event數組中(sequencer+1) % BUFFER_SIZE的地方,放入Event。這里的取模操作,就體現了Event數組用到最后,則回到頭部繼續放,所謂“Ring“ Buffer的輪循復用語義。

 

消費場景實現

 消費者從RingBuffer循環隊列中獲取一條消息時:

1. 從消費者私有Sequence,可以知道它自己消費到了RingBuffer隊列中的哪一條消息。

2. 從RingBuffer的全局指針Sequence,可以知道RingBuffer中最后一條沒有被消費的消息在什么位置。

3. N = (RuingBuffer的全局指針Sequence - 消費者私有Sequence),就是當前消費者,還可以消費多少Event。

4. 如果以上差值N為0,說明當前消費者已經消費過RingBuffer中的所有消息了。那么當前消費者會阻塞。等待生產者加入更多的消息:

 

 以上代碼中,紅框中的availableSequence就是RingBuffer的全局指針Sequence。綠框中的sequence是當前消費者的私有sequence。

如果這個判斷為true,說明RingBuffer中最新一條可以被消費的Event,已經被當前消費者消費過了。那么就會調用apployWaitMethod()阻塞,等待生產者產生更多的Event。

 5. 如果RingBuffer中,還有可以被當前消費者消費的Event,即N > 0,

     那么消費者,會一口氣獲取所有可以被消費的N個Event。即下圖中的while循環,直到N個Event都被消費才退出。這種一口氣消費盡量多的Event,是高性能的體現。

     從RingBuffer中每獲取一個Event,都會回調綠框中的eventHandler——這是應用注冊的Event處理方法,執行應用的Event消費業務邏輯。

  

   最后,上圖中的sequence.set(availableSequence),會把當前消費者的私有Sequence更新到RingBuffer的全局Sequence。表示RingBuffer中所有的Event都已經消費掉了。

 

 高性能的實現細節

無鎖

無鎖就沒有鎖競爭。當生產者、消費者線程數很高時,意義重大。所以,

往大里說,每個消費者維護自己的Sequence,基本沒有跨線程共享的狀態。

往小里說,Sequence的加法是CAS實現的。

  • 當生產者需要判斷RingBuffer是否已滿時,用CAS比較原先RingBuffer的Event個數,和假定放入新Event后Event的個數。
  • 如果CAS返回false,說明在判斷期間,別的生產者加入了新Event;或者別的消費者拿走了Event。那么當前判斷無效,需要重新判斷。這就是常見的 do { ... } while (false == CAS(oldVal, newVal))。——都是套路:)

 

對象的復用

JVM運行時,一怕創建大對象,二怕創建很多小對象。這都會導致JVM堆碎片化、對象元數據存儲的額外開銷大。這是高性能Java應用的噩夢。

為了解決第二點“很多小對象”,主流開源框架都會自己維護、復用對象池。LMAX Disruptor也不例外。

生產者不是創建新的Event對象,放入到RingBuffer中。而是從RingBuffer中取出一個已有的Event對象,更新它所指向的業務數據,來代表一個邏輯上的新Event。

所以LMAX Disruptor的生產者API,用起來有些麻煩——分為三步,一是下圖綠框中取出一個已有的、已經被所有人消費過的Event對象,二是下圖紅框中更新這個Event對象所指向的業務數據,三是下圖藍框中標記這個Event對象為邏輯上的新Event。

 

總結

https://github.com/LMAX-Exchange/disruptor/wiki/Introduction 這篇文章對Disruptor基本概念已經介紹得很清楚了。

但是,我覺得,入門介紹結合源碼去咀嚼,才會比較sexy,朋友們會深入理解。其實也不難,關鍵是找出源碼中的核心部分。

篇幅所限,本文對於Disruptor的高級功能沒有解釋,比如處理多個消費者之間的依賴關系。有機會補充。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM