Disruptor並發框架(一)簡介&上手demo


框架簡介

  • Martin Fowler在自己網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平台,它能夠以很低的延遲產生大量交易。這個系統是建立在JVM平台上,其核心是一個業務邏輯處理器,它能夠在一個線程里每秒處理6百萬訂單。業務邏輯處理器完全是運行在內存中,使`用事件源驅動方式。業務邏輯處理器的核心是Disruptor。
  • Disruptor它是一個開源的並發框架,並獲得2011 Duke’s 程序框架創新獎,能夠在無鎖的情況下實現網絡的Queue並發操作。
  • Disruptor是一個高性能的異步處理框架,或者可以認為是最快的消息框架(輕量的JMS),也可以認為是一個觀察者模式的實現,或者事件監聽模式的實現。

在使用之前,首先說明disruptor主要功能加以說明,你可以理解為他是一種高效的"生產者-消費者"模型。也就性能遠遠高於傳統的BlockingQueue容器。

上手demo

  • 首先聲明一個Event來包含需要傳遞的數據:
public class LongEvent { 
    private long value;
    public long getValue() { 
        return value; 
    } 
 
    public void setValue(long value) { 
        this.value = value; 
    } 
} 

  • 於需要讓Disruptor為我們創建事件,我們同時還聲明了一個EventFactory來實例化Event對象。
// 需要讓disruptor為我們創建事件,我們同時還聲明了一個EventFactory來實例化Event對象。
public class LongEventFactory implements EventFactory { 

    @Override 
    public Object newInstance() { 
        return new LongEvent(); 
    } 
} 

  • 我們還需要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中存儲的數據打印到終端:
public class LongEventHandler implements EventHandler<LongEvent>  {

	@Override
	public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
		System.out.println(longEvent.getValue()); 		
	}

}

  • 事件都會有一個生成事件的源,這個例子中假設事件是由於磁盤IO或者network讀取數據的時候觸發的,事件源使用一個ByteBuffer來模擬它接受到的數據,也就是說,事件源會在IO讀取到一部分數據的時候觸發事件(觸發事件不是自動的,程序員需要在讀取到數據的時候自己觸發事件並發布)

public class LongEventProducer {

	private final RingBuffer<LongEvent> ringBuffer;
	
	public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
		this.ringBuffer = ringBuffer;
	}
	
	/**
	 * onData用來發布事件,每調用一次就發布一次事件
	 * 它的參數會用過事件傳遞給消費者
	 */
	public void onData(ByteBuffer bb){
		//1.可以把ringBuffer看做一個事件隊列,那么next就是得到下面一個事件槽
        long sequence = ringBuffer.next();
		try {
			//2.用上面的索引取出一個空的事件用於填充(獲取該序號對應的事件對象)
			LongEvent event = ringBuffer.get(sequence);
			//3.獲取要通過事件傳遞的業務數據
			event.setValue(bb.getLong(0));
		} finally {
			//4.發布事件
			//注意,最后的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到調用;
            // 如果某個請求的 sequence 未被提交,將會堵塞后續的發布操作或者其它的 producer。
			ringBuffer.publish(sequence);
		}
	}
	
}
  • main函數執行調用
public class LongEventMain {

	public static void main(String[] args) throws Exception {
		//創建緩沖池
		ExecutorService  executor = Executors.newCachedThreadPool();
		//創建工廠
		LongEventFactory factory = new LongEventFactory();
		//創建bufferSize ,也就是RingBuffer大小,必須是2的N次方
		int ringBufferSize = 1024 * 1024; // 

		/**
		//BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的性能表現
		WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
		//SleepingWaitStrategy 的性能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生產者線程的影響最小,適合用於異步日志類似的場景
		WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
		//YieldingWaitStrategy 的性能是最好的,適合用於低延遲的系統。在要求極高性能且事件處理線數小於CPU邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超線程的特性
		WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
		*/


        /**
         *  參數說明:
         */
		
		//創建disruptor
		Disruptor<LongEvent> disruptor = 
                    new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
		// 連接消費事件方法
		disruptor.handleEventsWith(new LongEventHandler());
		
		// 啟動
		disruptor.start();
		
		//Disruptor 的事件發布過程是一個兩階段提交的過程:
		//發布事件
		RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
		
		LongEventProducer producer = new LongEventProducer(ringBuffer); 
		//LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
		ByteBuffer byteBuffer = ByteBuffer.allocate(8);
		for(long l = 0; l<100; l++){
			byteBuffer.putLong(0, l);
			producer.onData(byteBuffer);
			//Thread.sleep(1000);
		}

		
		disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;
		executor.shutdown();//關閉 disruptor 使用的線程池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;		
	 
	}
}

參考資料:http://ifeve.com/disruptor-getting-started/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM