Disruptor的使用
1.簡介
The LMAX Disruptor is a high performance inter-thread messaging library. It grew out of LMAX's research into concurrency, performance and non-blocking algorithms and today forms a core part of their Exchange's infrastructure.
(LMAX Disruptor是一個高性能的線程間消息傳遞庫。它源於LMAX對並發性、性能和非阻塞算法的研究,如今已成為其Exchange基礎架構的核心部分。)
-- 引用自GITHUB介紹
Disruptor是一個高性能的異步處理框架,或者可以認為是最快的消息框架(輕量的JMS),也可以認為是一個觀察者模式的實現,或者事件監聽模式的實現。以下是介紹wiki地址:
https://github.com/LMAX-Exchange/disruptor/wiki
2.Disruptor的設計方案
Disruptor通過以下設計來解決隊列速度慢的問題:
環形數組結構
為了避免垃圾回收,采用數組而非鏈表。同時,數組對處理器的緩存機制更加友好。
元素位置定位
數組長度2^n,通過位運算,加快定位的速度。下標采取遞增的形式。不用擔心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。
無鎖設計
每個生產者或者消費者線程,會先申請可以操作的元素在數組中的位置,申請到之后,直接在該位置寫入或者讀取數據。
下面忽略數組的環形結構,介紹一下如何實現無鎖設計。整個過程通過原子變量CAS,保證操作的線程安全。
3.Disruptor實現特征
另一個關鍵的實現低延遲的細節就是在Disruptor中利用無鎖的算法,所有內存的可見性和正確性都是利用內存屏障或者CAS操作。使用CAS來保證多線程安全,與大部分並發隊列使用的鎖相比,CAS顯然要快很多。CAS是CPU級別的指令,更加輕量,不必像鎖一樣需要操作系統提供支持,所以每次調用不需要在用戶態與內核態之間切換,也不需要上下文切換。
只有一個用例中鎖是必須的,那就是BlockingWaitStrategy(阻塞等待策略),唯一的實現方法就是使用Condition實現消費者在新事件到來前等待。許多低延遲系統使用忙等待去避免Condition的抖動,然而在系統忙等待的操作中,性能可能會顯著降低,尤其是在CPU資源嚴重受限的情況下,例如虛擬環境下的WEB服務器。
4.Disruptor實現生產者消費者模型
這里我們按照原作者Demo介紹制作一個放入LongValue的生產者和消費者模型,相關的代碼如下所示:
maven依賴
<dependencies>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
LongEvent
//定義事件event 通過Disruptor 進行交換的數據類型。
public class LongEvent {
private Long value;
public Long getValue() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
}
LongEventFactory
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}
}
LongEventHandler
// 消費者獲得數據
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
System.out.println("消費者獲得數據:" + longEvent.getValue());
}
}
LongEventProducer
// 生產者
public class LongEventProducer {
private RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
// 獲取事件隊列的下表位置
long sequence = ringBuffer.next();
try {
// 取出空隊列
LongEvent longEvent = ringBuffer.get(sequence);
// 給空隊列賦值
longEvent.setValue(byteBuffer.getLong(0));
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("生產者發送數據....");
ringBuffer.publish(sequence);
}
}
}
MainTest
public class MainTest {
public static void main(String[] args) {
// 1. 創建線程池
ExecutorService executor = Executors.newCachedThreadPool();
// 2. 創建工廠
LongEventFactory longEventFactory = new LongEventFactory();
// 3.創建ringbuffer 大小
int ringbuffer = 1024 * 1024; // 2的N次方
// 4. 創建disruptor
Disruptor<LongEvent> longEventDisruptor = new Disruptor<>(
longEventFactory, ringbuffer, executor,
ProducerType.MULTI, new YieldingWaitStrategy()
);
// 5. 連接消費者
longEventDisruptor.handleEventsWith(new LongEventHandler());
// 6. 啟動
longEventDisruptor.start();
// 7.創建ringbuffer容器
RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer();
// 8.創建生產者
LongEventProducer longEventProducer = new LongEventProducer(ringBuffer);
// 9. 指定緩沖區的大小
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (int i = 0; i < 10; i++) {
byteBuffer.putLong(0,i);
longEventProducer.onData(byteBuffer);
}
executor.shutdown();
longEventDisruptor.shutdown();
}
}
執行結果如下:
生產者發送數據....
生產者發送數據....
生產者發送數據....
生產者發送數據....
消費者獲得數據:0
生產者發送數據....
消費者獲得數據:1
生產者發送數據....
消費者獲得數據:2
生產者發送數據....
消費者獲得數據:3
生產者發送數據....
消費者獲得數據:4
生產者發送數據....
消費者獲得數據:5
生產者發送數據....
消費者獲得數據:6
消費者獲得數據:7
消費者獲得數據:8
消費者獲得數據:9