disruptor調研報告


票池暫定使用disruptor來做消息隊列,把最近對disruptor的調研結果整理一下。大部分文字都是把disruptor和其它網站上看到的資料翻譯一下。

原文:http://www.oraclejavamagazine-digital.com/javamagazine/20120304/?pg=56&pm=1&u1=friend#pg56

Disruptor是什么?

Disruptor是一個線程間通信的框架,即在多線程間共享數據。它是由LMAX公司開發的可信消息傳遞架構的一部分,以便用非常快速的方法來在多組件之間傳遞數據。它的一個核心思想是理解並適應硬件工作方式來達到最優的效果。

在很多(並行)架構里,普遍使用隊列來共享數據(例如傳遞消息)。圖1就是使用隊列來傳遞消息的一個示意圖(里面藍色的小圈圈表示一個線程)。這種架構允許生產線程(圖1里的stage1)在消費線程(圖1里的stage2)處理不過來的情況下,還可以繼續后面的工作,隊列在其中用來做為消息的緩沖區。

image圖1

在最簡單的情況下,disruptor可以用來替代圖1架構里的隊列,也就是線程間通過disruptor來傳遞數據。在disruptor里保存消息的數據結構是環狀緩沖區(RingBuffer - 后面都用RingBuffer這個術語)。生產線程stage1將消息放到RingBuffer里,然后消費線程stage2從RingBuffer里讀取消息,如圖2。

image圖2

從圖2里可以看到,RingBuffer里的每一個元素都有一個序列號(sequence number)來索引,RingBuffer維護當前最新放置的元素的序列號,這個序列號一直遞增,(通過求余來得到元素在RingBuffer下面的數組下標)。

Disruptor的關鍵特性是無鎖編程,這個是通過單一寫線程的方式實現的 - 即一塊數據永遠只有一個線程寫入。通過遵循這個編程原則來避免使用昂貴的同步鎖或CAS操作,這就是為什么Disruptor這么快的原因。

因為RingBuffer規避了鎖,而且每個EventProcessor維護自己的序列號。

向Disruptor發布消息

往RingBuffer里寫入消息使用兩步提交的方式。首先,生產線程Stage1需要確定RingBuffer里下一個空閑槽,如圖3。

image圖3

RingBuffer維護了最后一次寫入的序列號(圖3里的18號),因此就可以推知下一個空閑的槽號。RingBuffer通過檢查所有從RingBuffer讀取消息的EventProcessor的序列號,以判別下一個槽號是否空閑。

圖4演示了索取下一個空閑槽序列號的過程。

image圖4

當生產線程拿到了下一個序利號之后,它從RingBuffer里拿到槽里保存的對象並執行任何操作。這個過程中,因為RingBuffer的最新序列號依然是18,因此其它線程無法讀取19號槽里面的事件 - 生產線程還在處理它。

image圖5

圖5演示了RingBuffer在提交變更后的情況。當生產線程處理完第19號槽的數據后,它告訴RingBuffer將其公布出來。這個時候,RingBuffer才會更新它維護的序列號,任何等待讀取第19號槽里的數據的線程才能讀取它。

從RingBuffer里讀取信息

Disruptor框架里提供了一個叫做BatchEventProcessor來從RingBuffer里讀取數據。當生產線程向RingBuffer要求下一個可寫入的空閑槽的序列號時,同時一個EventProcessor(類似消費者,但其並消費RingBuffer里的元素 - 即不從RingBuffer里移除任何元素)也會維護其最后所處理的數據的序列號,並要求下一個可處理的數據的序列號。

圖6演示了EventProcessor等待處理下一個可讀取數據序利號的過程。

image圖6

EventProcessor不是直接從RingBuffer里獲取下一個可讀取數據的序列號,而是通過一個SequenceBarrier對象來做的,稍后我們談這個細節。

圖6里,EventProcessor(即消費者線程Stage2)最后看到的是第16號槽的數據,它希望處理下一個(第17號)槽的數據,因此它執行SequenceBarrier的waitFor(17)函數調用。線程Stage2可以一直等待下一個可讀序列號,因為如果尚沒有數據生產出來的話,它什么也不需要做。但跟圖6所示的一樣,RingBuffer里最新可用數據已經到18號槽了,因此waitFor返回18,即告訴EventProcessor可以一直讀到第18號的所有數據。如圖7。

image圖7

這種模式提供了很好的批處理行為,可以使用這種批處理代碼來實現EventHandler,在Disruptor里性能測試FizzBuzzEventHandler就是一個很好的例子。

處理系統組件之間的依賴關系

Disruptor處理系統內部多組件的依賴關系,而不引入任何線程競爭的做法很有意思。Disruptor遵循的是單線程寫入,多線程讀取的做法。Disruptor的原始設計是支持幾步具有特定順序的串行流水線操作 - 這種操作在企業級的系統里很常見。圖8演了一個標准的三步流水線操作:

image圖8

首先,所有事件都會寫入硬盤(日志“Journaling”操作),以便容災恢復。第二所有事件會備份(Replication操作)到第二台服務器上,只有這些步驟都完成之后系統才能處理實際的業務操作(Business Logic)。

串行做這三步操作是一個合理的做法,但不是最有效率的。日志和備份操作可以並行,因為它們相互獨立。但業務操作不行,因為它依賴前兩者,圖9演示了這個依賴關系。

image圖9

如果使用Disruptor,前兩步(日志和備份)可以直接讀RingBuffer。跟圖7示意的,它們都使用一個屏障(Sequence Barrier)來得到RingBuffer下一個可讀取的序列號。它們各自維護自己的序列號,這樣方便它們自己知道已經讀到哪了,並使用BatchEventProcessor來處理事件(日志和備份)。

業務線程也會從同一個RingBuffer里讀取事件,不過只能處理前兩個線程處理完的事件。這個限制通過第二個SequenceBarrier來實現,它被配置來讀取日志線程和備份線程的序列號,返回它們的最小值,以告訴業務線程安全讀取的范圍。

只有每一個EventProcessor都使用序列號屏障(Sequence Barrier)來確定可以安全處理的事件范圍,才能從RingBuffer里讀取數據。如圖10。

image圖10

雖然有很多線程讀取不同的序利號,但由於都是簡單遞增自己內部的序利號,所以線程間沒有競爭。

多個生產線程

Disruptor也支持,但是本文沒有說如何支持,放在后面寫。

結論

雖然disruptor的原理已經比較熟悉了,但是其API還不是很了解,我寫了一個實驗性的代碼,來完善我的理解 - 不過隨着理解的深入,代碼會不斷更新:

https://github.com/shiyimin/12306ngpm/blob/8be9178d318618f905aaed45fa6025df09371c31/trunk/tpms/src/test/java/org/ng12306/tpms/DisruptorConceptProofTest.java


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM