架構師養成記--15.Disruptor並發框架


一、概述

disruptor對於處理並發任務很擅長,曾有人測過,一個線程里1s內可以處理六百萬個訂單,性能相當感人。

這個框架的結構大概是:數據生產端 --> 緩存 --> 消費端

緩存中的數據是主動發給消費端的,而不是像一般的生產者消費者模式那樣,消費端去緩存中取數據。

可以將disruptor理解為,基於事件驅動的高效隊列、輕量級的JMS

disruptor學習網站:http://ifeve.com/disruptor-getting-started

二、開發流程

1.建Event類(數據對象)

2.建立一個生產數據的工廠類,EventFactory,用於生產數據;

3.監聽事件類(處理Event數據)

4.實例化Disruptor,配置參數,綁定事件;

5.建存放數據的核心 RingBuffer,生產的數據放入 RungBuffer。

三、HelloWord 

1.入口

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class LongEventMain {

    public static void main(String[] args) throws Exception {
        //創建緩沖池
        ExecutorService  executor = Executors.newCachedThreadPool();
        //創建工廠
        LongEventFactory factory = new LongEventFactory();
        //創建bufferSize ,也就是RingBuffer大小,必須是2的N次方
        int ringBufferSize = 1024 * 1024; // 

        /**
        //BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的性能表現
        WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
        //SleepingWaitStrategy 的性能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生產者線程的影響最小,適合用於異步日志類似的場景
        WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
        //YieldingWaitStrategy 的性能是最好的,適合用於低延遲的系統。在要求極高性能且事件處理線數小於CPU邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超線程的特性
        WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
        */
        
        //創建disruptor
        Disruptor<LongEvent> disruptor = 
                new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
        // 連接消費事件方法
        disruptor.handleEventsWith(new LongEventHandler());
        
        // 啟動
        disruptor.start();
        
        //Disruptor 的事件發布過程是一個兩階段提交的過程:
        //發布事件
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        
        LongEventProducer producer = new LongEventProducer(ringBuffer); 
        //LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for(long l = 0; l<100; l++){
            byteBuffer.putLong(0, l);
            producer.onData(byteBuffer);
            //Thread.sleep(1000);
        }

        
        disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;
        executor.shutdown();//關閉 disruptor 使用的線程池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;        
        
        
    }
}

 

 

2.數據對象:

public class LongEvent { 
    private long value;
    public long getValue() { 
        return value; 
    } 
 
    public void setValue(long value) { 
        this.value = value; 
    } 
} 

3.Event工廠

import com.lmax.disruptor.EventFactory;
// 需要讓disruptor為我們創建事件,我們同時還聲明了一個EventFactory來實例化Event對象。
public class LongEventFactory implements EventFactory { 

    @Override 
    public Object newInstance() { 
        return new LongEvent(); 
    } 
} 

4.生產者

import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;
/**
 * 很明顯的是:當用一個簡單隊列來發布事件的時候會牽涉更多的細節,這是因為事件對象還需要預先創建。
 * 發布事件最少需要兩步:獲取下一個事件槽並發布事件(發布事件的時候要使用try/finnally保證事件一定會被發布)。
 * 如果我們使用RingBuffer.next()獲取一個事件槽,那么一定要發布對應的事件。
 * 如果不能發布事件,那么就會引起Disruptor狀態的混亂。
 * 尤其是在多個事件生產者的情況下會導致事件消費者失速,從而不得不重啟應用才能會恢復。
 */
public class LongEventProducer {

    private final RingBuffer<LongEvent> ringBuffer;
    
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
        this.ringBuffer = ringBuffer;
    }
    
    /**
     * onData用來發布事件,每調用一次就發布一次事件
     * 它的參數會用過事件傳遞給消費者
     */
    public void onData(ByteBuffer bb){
        //1.可以把ringBuffer看做一個事件隊列,那么next就是得到下面一個事件槽
        long sequence = ringBuffer.next();
        try {
            //2.用上面的索引取出一個空的事件用於填充(獲取該序號對應的事件對象)
            LongEvent event = ringBuffer.get(sequence);
            //3.獲取要通過事件傳遞的業務數據
            event.setValue(bb.getLong(0));
        } finally {
            //4.發布事件
            //注意,最后的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到調用;如果某個請求的 sequence 未被提交,將會堵塞后續的發布操作或者其它的 producer。
            ringBuffer.publish(sequence);
        }
    }
    
    
}

 

5.消費者

import com.lmax.disruptor.EventHandler;

//我們還需要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中存儲的數據打印到終端:
public class LongEventHandler implements EventHandler<LongEvent>  {

    @Override
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
        System.out.println(longEvent.getValue());         
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM