框架簡介
- Martin Fowler在自己網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平台,它能夠以很低的延遲產生大量交易。這個系統是建立在JVM平台上,其核心是一個業務邏輯處理器,它能夠在一個線程里每秒處理6百萬訂單。業務邏輯處理器完全是運行在
內存
中,使`用事件源驅動
方式。業務邏輯處理器的核心是Disruptor。 - Disruptor它是一個開源的並發框架,並獲得2011 Duke’s 程序框架創新獎,能夠在無鎖的情況下實現網絡的Queue並發操作。
- Disruptor是一個高性能的異步處理框架,或者可以認為是最快的消息框架(輕量的JMS),也可以認為是一個觀察者模式的實現,或者事件監聽模式的實現。
在使用之前,首先說明disruptor主要功能加以說明,你可以理解為他是一種高效的"生產者-消費者"模型。也就性能遠遠高於傳統的BlockingQueue容器。
上手demo
- 首先聲明一個Event來包含需要傳遞的數據:
public class LongEvent {
private long value;
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
- 於需要讓Disruptor為我們創建事件,我們同時還聲明了一個EventFactory來實例化Event對象。
// 需要讓disruptor為我們創建事件,我們同時還聲明了一個EventFactory來實例化Event對象。
public class LongEventFactory implements EventFactory {
@Override
public Object newInstance() {
return new LongEvent();
}
}
- 我們還需要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中存儲的數據打印到終端:
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
System.out.println(longEvent.getValue());
}
}
- 事件都會有一個生成事件的源,這個例子中假設事件是由於磁盤IO或者network讀取數據的時候觸發的,事件源使用一個ByteBuffer來模擬它接受到的數據,也就是說,事件源會在IO讀取到一部分數據的時候觸發事件(觸發事件不是自動的,程序員需要在讀取到數據的時候自己觸發事件並發布)
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
this.ringBuffer = ringBuffer;
}
/**
* onData用來發布事件,每調用一次就發布一次事件
* 它的參數會用過事件傳遞給消費者
*/
public void onData(ByteBuffer bb){
//1.可以把ringBuffer看做一個事件隊列,那么next就是得到下面一個事件槽
long sequence = ringBuffer.next();
try {
//2.用上面的索引取出一個空的事件用於填充(獲取該序號對應的事件對象)
LongEvent event = ringBuffer.get(sequence);
//3.獲取要通過事件傳遞的業務數據
event.setValue(bb.getLong(0));
} finally {
//4.發布事件
//注意,最后的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到調用;
// 如果某個請求的 sequence 未被提交,將會堵塞后續的發布操作或者其它的 producer。
ringBuffer.publish(sequence);
}
}
}
- main函數執行調用
public class LongEventMain {
public static void main(String[] args) throws Exception {
//創建緩沖池
ExecutorService executor = Executors.newCachedThreadPool();
//創建工廠
LongEventFactory factory = new LongEventFactory();
//創建bufferSize ,也就是RingBuffer大小,必須是2的N次方
int ringBufferSize = 1024 * 1024; //
/**
//BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的性能表現
WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
//SleepingWaitStrategy 的性能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生產者線程的影響最小,適合用於異步日志類似的場景
WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
//YieldingWaitStrategy 的性能是最好的,適合用於低延遲的系統。在要求極高性能且事件處理線數小於CPU邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超線程的特性
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
*/
/**
* 參數說明:
*/
//創建disruptor
Disruptor<LongEvent> disruptor =
new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
// 連接消費事件方法
disruptor.handleEventsWith(new LongEventHandler());
// 啟動
disruptor.start();
//Disruptor 的事件發布過程是一個兩階段提交的過程:
//發布事件
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
//LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for(long l = 0; l<100; l++){
byteBuffer.putLong(0, l);
producer.onData(byteBuffer);
//Thread.sleep(1000);
}
disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;
executor.shutdown();//關閉 disruptor 使用的線程池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;
}
}