一、什么是 Disruptor
Disruptor是一個高性能的異步處理框架,或者可以認為是最快的消息框架(輕量的JMS),也可以認為是一個觀察者模式實現,或者事件-監聽模式的實現,直接稱disruptor模式。disruptor最大特點是高性能,其LMAX架構可以獲得每秒6百萬訂單,用1微秒的延遲獲得吞吐量為100K+。
可以理解為消費者-生產者的消息發布訂閱模式。
二、Disruptor 的核心概念
先從了解 Disruptor 的核心概念開始,來了解它是如何運作的。下面介紹的概念模型,既是領域對象,也是映射到代碼實現上的核心對象。
Ring Buffer
如其名,環形的緩沖區。曾經 RingBuffer 是 Disruptor 中的最主要的對象,但從3.0版本開始,其職責被簡化為僅僅負責對通過 Disruptor 進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 可以由用戶的自定義實現來完全替代。
Sequence Disruptor
通過順序遞增的序號來編號管理通過其進行交換的數據(事件),對數據(事件)的處理過程總是沿着序號逐個遞增處理。一個 Sequence 用於跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一個 AtomicLong 也可以用於標識進度,但定義 Sequence 來負責該問題還有另一個目的,那就是防止不同的 Sequence 之間的CPU緩存偽共享(Flase Sharing)問題。
(注:這是 Disruptor 實現高性能的關鍵點之一,網上關於偽共享問題的介紹已經汗牛充棟,在此不再贅述)。
Sequencer
Sequencer 是 Disruptor 的真正核心。此接口有兩個實現類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產者和消費者之間快速、正確地傳遞數據的並發算法。
Sequence Barrier
用於保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。 Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯。
Wait Strategy
定義 Consumer 如何進行等待下一個事件的策略。 (注:Disruptor 定義了多種不同的策略,針對不同的場景,提供了不一樣的性能表現)
Event
在 Disruptor 的語義中,生產者和消費者之間進行交換的數據被稱為事件(Event)。它不是一個被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義並指定。
EventProcessor
EventProcessor 持有特定消費者(Consumer)的 Sequence,並提供用於調用事件處理實現的事件循環(Event Loop)。
EventHandler
Disruptor 定義的事件處理接口,由用戶實現,用於處理事件,是 Consumer 的真正實現。
Producer
即生產者,只是泛指調用 Disruptor 發布事件的用戶代碼,Disruptor 沒有定義特定接口或類型。
三、Disruptor入門示例代碼,進行實驗
jar版本:disruptor-3.2.1.jar
1、定義事件
事件(Event)就是通過 Disruptor 進行交換的數據類型。
package com.ljq.disruptor; import java.io.Serializable; /** * 定義事件數據,本質是個普通JavaBean * * @author jqlin */ @SuppressWarnings("serial") public class LongEvent implements Serializable { private long value; public LongEvent() { super(); } public LongEvent(long value) { super(); this.value = value; } public long getValue() { return value; } public void setValue(long value) { this.value = value; } @Override public String toString() { return "LongEvent [value=" + value + "]"; } }
2、定義事件工廠
事件工廠(Event Factory)定義了如何實例化前面第1步中定義的事件(Event),需要實現接口com.lmax.disruptor.EventFactory<T>。
Disruptor 通過 EventFactory 在 RingBuffer 中預創建 Event 的實例。一個 Event 實例實際上被用作一個“數據槽”,發布者發布前,先從 RingBuffer 獲得一個 Event 的實例,然后往 Event 實例中填充數據,之后再發布到 RingBuffer 中,之后由 Consumer 獲得該 Event 實例並從中讀取數據。
package com.ljq.disruptor; import com.lmax.disruptor.EventFactory; /** * 定義事件工廠,實例化LongEvent事件 * * @author jqlin */ public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } }
3、LongEvent事件生產者
RingBuffer是消息存儲結構,為環形存儲結構,每個單元存儲一條消息。類似於隊列。當ringbuffer中數據填滿后,環就會阻塞,等待消費者消費掉數據。當所有消費者消費掉環中一個數據,新的消息才可以加入環中。每個環插入數據后,都會分配下一個位置的編號,即sequence。
Disruptor的事件發布過程是一個兩階段提交的過程:
第一步:先從 RingBuffer 獲取下一個可以寫入的事件的序號;
第二步:獲取對應的事件對象,將數據寫入事件對象;
第三部:將事件提交到 RingBuffer;
事件只有在提交之后才會通知 EventProcessor 進行處理;
package com.ljq.disruptor; import com.lmax.disruptor.RingBuffer; /** * LongEvent事件生產者,生產LongEvent事件 * * @author jqlin */ public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void produceData(long value) { long sequence = ringBuffer.next(); // 獲得下一個Event槽的下標 try { // 給Event填充數據 LongEvent event = ringBuffer.get(sequence); event.setValue(value); } finally { // 發布Event,激活觀察者去消費, 將sequence傳遞給該消費者 // 注意,最后的 ringBuffer.publish() 方法必須包含在 finally 中以確保必須得到調用;如果某個請求的 sequence 未被提交,將會堵塞后續的發布操作或者其它的 producer。 ringBuffer.publish(sequence); } } }
Disruptor還提供另外一種形式的調用來簡化以上操作,並確保 publish 總是得到調用。
package com.ljq.disruptor; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; /** * LongEvent事件生產者,Disruptor提供另外一種形式的調用來簡化事件生產者的操作,並確保 publish 總是得到調用。 * * @author jqlin */ public class LongEventProducerWithTranslator { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void produceData(long value) { ringBuffer.publishEvent(TRANSLATOR, value); } // 使用EventTranslator, 封裝獲取Event的過程 private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR = new EventTranslatorOneArg<LongEvent, Long>() { @Override public void translateTo(LongEvent event, long sequeue, Long value) { event.setValue(value); } }; }
translateTo方法將RingBuffer中的消息,轉換成java對象格式。示例為LongEvent對象,后續消費者LongEventHandler處理器直接操作LongEvent對象,獲取消息各屬性信息,本示例 為value屬性。
produceData()方法,將生產者生產的消息放入RingBuffer中。
4、LongEvent事件消息者
定義事件處理的具體實現,通過實現接口 com.lmax.disruptor.EventHandler<T> 定義事件處理的具體實現。
定義如何處理消息的地方,此處執行速度要足夠快。否則,會影響RingBuffer后續沒空間加入新的數據。因此,不能做業務耗時操作。建議另外開始 java 線程池處理消息。
package com.ljq.disruptor; import com.lmax.disruptor.EventHandler; /** * LongEvent事件消息者,消息LongEvent事件 * * @author Administrator * */ public class LongEventHandler implements EventHandler<LongEvent> { @Override public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("consumer:" + Thread.currentThread().getName() + " Event: value=" + event.getValue() + ",sequence=" + sequence + ",endOfBatch=" + endOfBatch); } }
5、LongEventMain
消費者-生產者啟動類,其依靠構造Disruptor對象,調用start()方法完成啟動線程。
指定等待策略
Disruptor 定義了 com.lmax.disruptor.WaitStrategy 接口用於抽象 Consumer 如何等待新事件,這是策略模式的應用。
Disruptor 提供了多個 WaitStrategy 的實現,每種策略都具有不同性能和優缺點,根據實際運行環境的 CPU 的硬件特點選擇恰當的策略,並配合特定的 JVM 的配置參數,能夠實現不同的性能提升。
例如:
BlockingWaitStrategy 最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的性能表現;
SleepingWaitStrategy 性能表現跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生產者線程的影響最小,適合用於異步日志類似的場景;
YieldingWaitStrategy 性能是最好的,適合用於低延遲的系統。在要求極高性能且事件處理線數小於 CPU 邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。
BusySpinWaitStrategy 自旋等待,類似自旋鎖. 低延遲但同時對CPU資源的占用也多
package com.ljq.disruptor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; public class LongEventMain { public static void main(String[] args) throws InterruptedException { long beginTime = System.currentTimeMillis(); // 定義用於事件處理的線程池,Disruptor 通過 java.util.concurrent.ExecutorService 提供的線程來觸發 Consumer 的事件處理 ExecutorService executor = Executors.newCachedThreadPool(); // 指定事件工廠 LongEventFactory factory = new LongEventFactory(); // 指定 ring buffer字節大小,必需為2的N次方(能將求模運算轉為位運算提高效率 ),否則影響性能 int bufferSize = 1024 * 1024; // 單線程模式,獲取額外的性能 Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); // 設置事件業務處理器---消費者 disruptor.handleEventsWith(new LongEventHandler()); // 啟動disruptor線程 disruptor.start(); // 獲取 ring buffer環,用於接取生產者生產的事件 RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); // 為 ring buffer指定事件生產者 LongEventProducer producer = new LongEventProducer(ringBuffer); //LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer); for (int i = 0; i<100000; i++) { producer.produceData(i);// 生產者生產數據 } disruptor.shutdown(); //關閉 disruptor,方法會堵塞,直至所有的事件都得到處理; executor. shutdown(); //關閉 disruptor 使用的線程池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉; System.out.println(String.format("總共耗時%s毫秒", (System.currentTimeMillis() - beginTime))); } }