Disruptor快速入門


JDK的多線程與並發庫一文中, 提到了BlockingQueue實現了生產者-消費者模型

BlockingQueue是基於鎖實現的, 而鎖的效率通常較低. 有沒有使用CAS機制實現的生產者-消費者?

Disruptor就是這樣.

disruptor使用觀察者模式, 主動將消息發送給消費者, 而不是等消費者從隊列中取; 在無鎖的情況下, 實現queue(環形, RingBuffer)的並發操作, 性能遠高於BlockingQueue

 

1.生產者-消費者

1.1使用Disruptor類

RingBuffer通過Disruptor實例獲得

public class Client {

    public static void main(String[] args) throws Exception {
        
        //1.配置並獲得Disruptor
        ExecutorService  executor = Executors.newCachedThreadPool(); 
        LongEventFactory factory = new LongEventFactory();
        // 設置RingBuffer大小, 需為2的N次方(能將求模運算轉為位運算提高效率 ), 否則影響性能
        int ringBufferSize = 1024 * 1024; 

        //創建disruptor, 泛型參數:傳遞的事件的類型
        // 第一個參數: 產生Event的工廠類, Event封裝生成-消費的數據
        // 第二個參數: RingBuffer的緩沖區大小
        // 第三個參數: 線程池
        // 第四個參數: SINGLE單個生產者, MULTI多個生產者
        // 第五個參數: WaitStrategy 當消費者阻塞在SequenceBarrier上, 消費者如何等待的策略. 
            //BlockingWaitStrategy 使用鎖和條件變量, 效率較低, 但CPU的消耗最小, 在不同部署環境下性能表現比較一致
            //SleepingWaitStrategy 多次循環嘗試不成功后, 讓出CPU, 等待下次調度; 多次調度后仍不成功, 睡眠納秒級別的時間再嘗試. 平衡了延遲和CPU資源占用, 但延遲不均勻.
            //YieldingWaitStrategy 多次循環嘗試不成功后, 讓出CPU, 等待下次調度. 平衡了延遲和CPU資源占用, 延遲也比較均勻.
            //BusySpinWaitStrategy 自旋等待,類似自旋鎖. 低延遲但同時對CPU資源的占用也多.
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
        // 注冊事件消費處理器, 也即消費者. 可傳入多個EventHandler ...
        disruptor.handleEventsWith(new LongEventHandler());
        // 啟動
        disruptor.start();
        
        //2.將數據裝入RingBuffer
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        // 創建生產者, 以下方式一使用原始api, 方式二使用新API
        //LongEventProducer producer = new LongEventProducer(ringBuffer); 
        LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        
        ByteBuffer byteBuffer = ByteBuffer.allocate(8); // 這里只是筆者實驗, 不是必須要用ByteBuffer保存long數據
        for(int i = 0; i < 100; ++i){
            byteBuffer.putLong(0, i);
            producer.produceData(byteBuffer);
        }

        disruptor.shutdown(); //關閉 disruptor  阻塞直至所有事件都得到處理
        executor.shutdown(); // 需關閉 disruptor使用的線程池, 上一步disruptor關閉時不會連帶關閉線程池        
    }
}
// Event封裝要傳遞的數據 
public class LongEvent { 
    private long value;
    public long getValue() { 
        return value; 
    } 
    public void setValue(long value) { 
        this.value = value; 
    } 
} 
// 產生Event的工廠
public class LongEventFactory implements EventFactory { 
    @Override 
    public Object newInstance() { 
        return new LongEvent(); 
    } 
} 
public class LongEventHandler implements EventHandler<LongEvent>  {
    // 消費邏輯
    @Override
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
        System.out.println(longEvent.getValue());         
    }
}
//生產者實現一
public class LongEventProducer {
    // 生產者持有RingBuffer的引用
    private final RingBuffer<LongEvent> ringBuffer;
    
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
        this.ringBuffer = ringBuffer;
    }
    
    public void produceData(ByteBuffer bb){
        // 獲得下一個Event槽的下標
        long sequence = ringBuffer.next();
        try {
            // 給Event填充數據
            LongEvent event = ringBuffer.get(sequence);
            event.setValue(bb.getLong(0));
        } finally {
            // 發布Event, 激活觀察者去消費, 將sequence傳遞給該消費者
            //publish應該放在 finally塊中以確保一定會被調用->如果某個事件槽被獲取但未提交, 將會堵塞后續的publish動作。
            ringBuffer.publish(sequence);
        }
    }
}
//生產者實現二
public class LongEventProducerWithTranslator {

    // 使用EventTranslator, 封裝 獲取Event的過程
    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
        @Override
        public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) {
            event.setValue(buffer.getLong(0));
        }
    };
    
    private final RingBuffer<LongEvent> ringBuffer;
    
    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    
    public void produceData(ByteBuffer buffer){
        // 發布
        ringBuffer.publishEvent(TRANSLATOR, buffer);
    }
}

 

1.2 直接使用RingBuffer

給出了兩種方式:EventProcessor與WorkPool(可處理多消費者)

public class ClientForEventProcessor {  
   
    public static void main(String[] args) throws Exception {  
        int BUFFER_SIZE = 1024;
        int THREAD_NUMBERS = 4;
        
        // 這里直接獲得RingBuffer. createSingleProducer創建一個單生產者的RingBuffer
        
        // 第一個參數為EventFactory,產生數據Trade的工廠類
        // 第二個參數是RingBuffer的大小,需為2的N次方     
        // 第三個參數是WaitStrategy, 消費者阻塞時如何等待生產者放入Event
        final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade(UUID.randomUUID().toString());
            }  
        }, BUFFER_SIZE, new YieldingWaitStrategy());  
        
        //SequenceBarrier, 協調消費者與生產者, 消費者鏈的先后順序. 阻塞后面的消費者(沒有Event可消費時)
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
          
        //創建事件處理器 (消費者): 處理ringBuffer, 用TradeHandler的方法處理(實現EventHandler), 用sequenceBarrier協調生成-消費
        //如果存在多個消費者(老api, 可用workpool解決) 那重復執行 創建事件處理器-注冊進度-提交消費者的過程, 把其中TradeHandler換成其它消費者類  
        BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(ringBuffer, sequenceBarrier, new TradeHandler());  
        //把消費者的消費進度情況注冊給RingBuffer結構(生產者)    !如果只有一個消費者的情況可以省略 
        ringBuffer.addGatingSequences(transProcessor.getSequence());  
          
        //創建線程池  
        ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
        //把消費者提交到線程池, !說明EventProcessor實現了callable接口  
        executors.submit(transProcessor);  
        
        // 生產者, 這里新建線程不是必要的
        Future<?> future= executors.submit(new Callable<Void>() {  
            @Override  
            public Void call() throws Exception {  
                long seq;  
                for (int i = 0; i < 10; i++) {
                    seq = ringBuffer.next();
                    ringBuffer.get(seq).setPrice(Math.random() * 9999);
                    ringBuffer.publish(seq);
                } 
                return null;  
            }  
        }); 

        Thread.sleep(1000); //等上1秒,等待消費完成  
        transProcessor.halt(); //通知事件處理器  可以結束了(並不是馬上結束!)  
        executors.shutdown(); 
    }  
}  
public class ClientForWorkPool {  
    public static void main(String[] args) throws InterruptedException {  
        int BUFFER_SIZE = 1024;
        int THREAD_NUMBERS = 4;
        
        RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
            public Trade newInstance() {  
                return new Trade(UUID.randomUUID().toString());
            }  
        }, BUFFER_SIZE);  
       
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
        
        // 第三個參數: 異常處理器, 這里用ExceptionHandler; 第四個參數WorkHandler的實現類, 可為數組(即傳入多個消費者)
        WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), new TradeHandler());  
          
        ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
        workerPool.start(executors);  
          
        // 生產10個數據
        for (int i = 0; i < 8; i++) {
            long seq = ringBuffer.next();
            ringBuffer.get(seq).setPrice(Math.random() * 9999);
            ringBuffer.publish(seq);
        }
          
        Thread.sleep(1000);  
        workerPool.halt();  
        executors.shutdown();  
    }  
}  
// 封裝交易數據
public class Trade {  
    private String id; // 訂單ID  
    private String name;
    private double price; // 金額  
    
    public Trade(String id) {
        this.id = id;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public double getPrice() {
        return price;
    }
    public void setPrice(double price) {
        this.price = price;
    }
} 
// 消費者, 這里實現一個接口就行, 寫兩個是為了同時測試EventProcessor和WorkPool
public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> {  
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
    @Override  
    public void onEvent(Trade event) throws Exception {  
        //具體的消費邏輯  
        System.out.println(event.getId());  
    }  
}  

 

1.3 多生產者-多消費者

一個Event只能被某一個消費者處理

public static void main(String[] args) throws Exception {
        //創建RingBuffer
        RingBuffer<Order> ringBuffer = 
                RingBuffer.create(ProducerType.MULTI, 
                        new EventFactory<Order>() {  
                            @Override  
                            public Order newInstance() {  
                                return new Order();  
                            }  
                        }, 
                        1024 * 1024, new YieldingWaitStrategy());
        
        SequenceBarrier barriers = ringBuffer.newBarrier();
        
        Consumer[] consumers = new Consumer[3];
        for(int i = 0; i < consumers.length; i++){
            consumers[i] = new Consumer("ct" + i);
        }
        // 3個消費者
        WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer, barriers, new MyExceptionHandler(), consumers);
        
        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());  
        ExecutorService executors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        workerPool.start(executors);  
        // 10個生產者, 每個生成者生產20個數據
        for (int i = 0; i < 10; i++) {  
            final Producer p = new Producer(ringBuffer);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for(int j = 0; j < 2; j++){
                        p.produceData(UUID.randomUUID().toString());
                    }
                }
            }).start();
        } 
        
        System.out.println("----開始生產----");
        Thread.sleep(1000);  // 等待消費完成
        System.out.println("總共消費數量:" + consumers[0].getCount() );
        
        workerPool.halt(); 
        executors.shutdown();
    }
    
    static class MyExceptionHandler implements ExceptionHandler {  
        public void handleEventException(Throwable ex, long sequence, Object event) {}  
        public void handleOnStartException(Throwable ex) {}  
        public void handleOnShutdownException(Throwable ex) {}  
    } 
}
public class Consumer implements WorkHandler<Order>{
    
    private String consumerId;
    // 消費計數器
    private static AtomicInteger count = new AtomicInteger(0);
    
    public Consumer(String consumerId){
        this.consumerId = consumerId;
    }

    @Override
    public void onEvent(Order order) throws Exception {
        System.out.println("當前消費者: " + this.consumerId + ", 消費信息: " + order.getId());
        count.incrementAndGet();
    }
    
    public int getCount(){
        return count.get();
    }
}
public class Producer {

    private final RingBuffer<Order> ringBuffer;
    public Producer(RingBuffer<Order> ringBuffer){
        this.ringBuffer = ringBuffer;
    }
    public void produceData(String data){
        long sequence = ringBuffer.next();
        try {
            Order order = ringBuffer.get(sequence);
            order.setId(data);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}
public class Order {  
    private String id;
    private String name;
    private double price;
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public double getPrice() {
        return price;
    }
    public void setPrice(double price) {
        this.price = price;
    }
}  

 

2. 並行處理

除了實現生產者-消費者模型, Disruptor還可以進行多路並行處理(一個Event可以進入多個路徑同時進行處理, 因為不同路徑操作的是同一個Event, 路徑可以匯合)

public class Client {  
    public static void main(String[] args) throws InterruptedException {  
       
        long beginTime=System.currentTimeMillis();  
        int bufferSize=1024;  
        ExecutorService executor=Executors.newFixedThreadPool(7);  // 注意: 線程數>=handler數+1

        Disruptor<Trade> disruptor = new Disruptor<Trade>(
                new EventFactory<Trade>() {
                    @Override
                    public Trade newInstance() {
                        return new Trade(UUID.randomUUID().toString());
                    }
                }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());
        // 菱形操作   
        /*
        // 創建消費者組(含H1,H2)   H1,H2並行執行無先后順序
        EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());
        // C1,C2都完成后執行C3, 像JMS傳遞消息
        handlerGroup.then(new Handler3());
        */
        
        // 順序操作
        /*
        disruptor.handleEventsWith(new Handler1()).handleEventsWith(new Handler2()).handleEventsWith(new Handler3());
        */
        
        // 六邊形操作. H1, H4串行執行; H2, H5串行執行; 而H1,H4 與 H2,H5 並行執行
        Handler1 h1 = new Handler1();
        Handler2 h2 = new Handler2();
        Handler3 h3 = new Handler3();
        Handler4 h4 = new Handler4();
        Handler5 h5 = new Handler5();
        disruptor.handleEventsWith(h1, h2);
        disruptor.after(h1).handleEventsWith(h4);
        disruptor.after(h2).handleEventsWith(h5);
        disruptor.after(h4, h5).handleEventsWith(h3);
        
        disruptor.start(); 
// 啟動生產線程  
        executor.submit(new TradePublisher(disruptor));
        Thread.sleep(1000); // 等待消費完成
       
        disruptor.shutdown();  
        executor.shutdown();  
        System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime));  
    }  
}  
public class TradePublisher implements Runnable {

    private Disruptor<Trade> disruptor;
private static final int LOOP = 100;// 模擬百次交易的發生 public TradePublisher(Disruptor<Trade> disruptor) { this.disruptor = disruptor; } @Override public void run() { TradeEventTranslator tradeTransloator = new TradeEventTranslator(); for (int i = 0; i < LOOP; i++) { disruptor.publishEvent(tradeTransloator); } } } class TradeEventTranslator implements EventTranslator<Trade> { private Random random = new Random(); @Override public void translateTo(Trade event, long sequence) { this.generateTrade(event); } private Trade generateTrade(Trade trade) { trade.setPrice(random.nextDouble() * 9999); return trade; } }
public class Handler1 implements EventHandler<Trade> {
    @Override
    public void onEvent(Trade event, long sequence, boolean endOfBatch)
            throws Exception {
        System.out.println("handler1: set name");
        event.setName("h1");
    }
}
public class Handler2 implements EventHandler<Trade> {  
    @Override  
    public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
        System.out.println("handler2: set price");
        event.setPrice(17.0);
    }  
}  
public class Handler3 implements EventHandler<Trade> {
    @Override  
    public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
        System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + ";  instance: " + event.getId());
    }  
}
public class Handler4 implements EventHandler<Trade> {  
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        System.out.println("handler4: append name");
        event.setName(event.getName() + "h4");
    }  
}  
public class Handler5 implements EventHandler<Trade> {    
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        System.out.println("handler5: add price");
        event.setPrice(event.getPrice() + 3.0);
    }  
}  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM