Disruptor的使用


..................2015年的第一天...................

本文代碼托管在 https://github.com/hupengcool/disruptor-starter

Intruduction

關於吹牛逼的話就不說了。。。Disruptor是Java實現的用於線程間通信的消息組件。其核心是一個Lock-free的Ringbuffer,Disruptor使用CAS而不是Lock。與大部分並發隊列使用的Lock相比,CAS顯然要快很多。CAS是CPU級別的指令,更加輕量,不需要像Lock一樣需要OS的支持,所以每次調用不需要kernel entry,也不需要context switch。當然,使用CAS的代價是Disruptor實現的復雜程度也相對提高了。

Component

Sequence

Sequence是Disruptor最核心的組件,上面已經提到過了。生產者對RingBuffer的互斥訪問,生產者與消費者之間的協調以及消費者之間的協調,都是通過Sequence實現。幾乎每一個重要的組件都包含Sequence。那么Sequence是什么呢?首先Sequence是一個遞增的序號,說白了就是計數器;其次,由於需要在線程間共享,所以Sequence是引用傳遞,並且是線程安全的;再次,Sequence支持CAS操作;最后,為了提高效率,Sequence通過padding來避免偽共享。

RingBuffer

RingBuffer是存儲消息的地方,通過一個名為cursor的Sequence對象指示隊列的頭,協調多個生產者向RingBuffer中添加消息,並用於在消費者端判斷RingBuffer是否為空。巧妙的是,表示隊列尾的Sequence並沒有在RingBuffer中,而是由消費者維護。這樣的好處是多個消費者處理消息的方式更加靈活,可以在一個RingBuffer上實現消息的單播,多播,流水線以及它們的組合。其缺點是在生產者端判斷RingBuffer是否已滿是需要跟蹤更多的信息,為此,在RingBuffer中維護了一個名為gatingSequences的Sequence數組來跟蹤相關Seqence。

SequenceBarrier

SequenceBarrier用來在消費者之間以及消費者和RingBuffer之間建立依賴關系。在Disruptor中,依賴關系實際上指的是Sequence的大小關系,消費者A依賴於消費者B指的是消費者A的Sequence一定要小於等於消費者B的Sequence,這種大小關系決定了處理某個消息的先后順序。因為所有消費者都依賴於RingBuffer,所以消費者的Sequence一定小於等於RingBuffer中名為cursor的Sequence,即消息一定是先被生產者放到Ringbuffer中,然后才能被消費者處理。

SequenceBarrier在初始化的時候會收集需要依賴的組件的Sequence,RingBuffer的cursor會被自動的加入其中。需要依賴其他消費者和/或RingBuffer的消費者在消費下一個消息時,會先等待在SequenceBarrier上,直到所有被依賴的消費者和RingBuffer的Sequence大於等於這個消費者的Sequence。當被依賴的消費者或RingBuffer的Sequence有變化時,會通知SequenceBarrier喚醒等待在它上面的消費者。

WaitStrategy

當消費者等待在SequenceBarrier上時,有許多可選的等待策略,不同的等待策略在延遲和CPU資源的占用上有所不同,可以視應用場景選擇:

BusySpinWaitStrategy : 自旋等待,類似Linux Kernel使用的自旋鎖。低延遲但同時對CPU資源的占用也多。

BlockingWaitStrategy : 使用鎖和條件變量。CPU資源的占用少,延遲大。

SleepingWaitStrategy : 在多次循環嘗試不成功后,選擇讓出CPU,等待下次調度,多次調度后仍不成功,嘗試前睡眠一個納秒級別的時間再嘗試。這種策略平衡了延遲和CPU資源占用,但延遲不均勻。

YieldingWaitStrategy : 在多次循環嘗試不成功后,選擇讓出CPU,等待下次調。平衡了延遲和CPU資源占用,但延遲也比較均勻。

PhasedBackoffWaitStrategy : 上面多種策略的綜合,CPU資源的占用少,延遲大。

BatchEvenProcessor

在Disruptor中,消費者是以EventProcessor的形式存在的。其中一類消費者是BatchEvenProcessor。每個BatchEvenProcessor有一個Sequence,來記錄自己消費RingBuffer中消息的情況。所以,一個消息必然會被每一個BatchEvenProcessor消費。

WorkProcessor

另一類消費者是WorkProcessor。每個WorkProcessor也有一個Sequence,多個WorkProcessor還共享一個Sequence用於互斥的訪問RingBuffer。一個消息被一個WorkProcessor消費,就不會被共享一個Sequence的其他WorkProcessor消費。這個被WorkProcessor共享的Sequence相當於尾指針。

WorkerPool

共享同一個Sequence的WorkProcessor可由一個WorkerPool管理,這時,共享的Sequence也由WorkerPool創建。

Use Cases

下面以Disruptor 3.3.0版本為例介紹Disruptor的初級使用,本文並沒有用那些比較原始的API,如果想知道上面寫的一些api如何使用,可以參考 https://github.com/LMAX-Exchange/disruptor/tree/master/src/perftest/java/com/lmax/disruptor 為了簡化使用,框架提供Disruptor類來簡化使用,下面主要是使用這個類來演示。
首先定義一個Event:

/**
 * Created by hupeng on 2015/1/1.
 */
public class MyEvent {

    private long value;

    public void setValue(long value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "MyEvent{" +
                "value=" + value +
                '}';
    }
}

然后提供一個EventFactory,RingBuffer通過這factory來初始化在Event。

import com.lmax.disruptor.EventFactory;

/**
 * Created by hupeng on 2015/1/1.
 */
public class MyEventFactory implements EventFactory<MyEvent> {
    @Override
    public MyEvent newInstance() {
        return new MyEvent();
    }
}

然后寫一個Producer類,也就是消息的生產者。

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

/**
 * Created by hupeng on 2015/1/1.
 */
public class MyEventProducer {

    private RingBuffer<MyEvent> ringBuffer;

    public MyEventProducer(RingBuffer<MyEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg TRANSLATOR = new EventTranslatorOneArg<MyEvent, Long>() {

        @Override
        public void translateTo(MyEvent event, long sequence, Long value) {
            event.setValue(value);
        }
    };
    
    public void onData(final Long value) {
        ringBuffer.publishEvent(TRANSLATOR,value);
    }
}

然后寫一個EventHandler。這個就是我們定義怎么處理消息的地方。

import com.lmax.disruptor.EventHandler;

/**
 * Created by hupeng on 2015/1/1.
 */
public class MyEventHandler implements EventHandler<MyEvent> {
    @Override
    public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println(event);
    }
}

主程序:

import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import disruptor.starter.support.*;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class MyEventMain {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        int bufferSize = 1024;

        Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(new MyEventFactory(),
                bufferSize, executorService, ProducerType.SINGLE, new YieldingWaitStrategy());
        disruptor.handleExceptionsWith(new IgnoreExceptionHandler());

        disruptor.handleEventsWith(new MyEventHandler(),new MyEventHandler());
//        disruptor.handleEventsWith(new MyEventHandler()).then(new MyEventHandler());  //Pipeline
        RingBuffer<MyEvent> ringBuffer = disruptor.start();

        MyEventProducer producer = new MyEventProducer(ringBuffer);
        for (long i = 0; i < 10; i++) {
            producer.onData(i);
            Thread.sleep(1000);// wait for task execute....
        }

        disruptor.shutdown();

        ExecutorsUtils.shutdownAndAwaitTermination(executorService, 60, TimeUnit.SECONDS);
    }
}

在這個例子中輸出

MyEvent{value=0}
MyEvent{value=0}
MyEvent{value=1}
MyEvent{value=1}
MyEvent{value=2}
MyEvent{value=2}
MyEvent{value=3}
MyEvent{value=3}
MyEvent{value=4}
MyEvent{value=4}
MyEvent{value=5}
MyEvent{value=5}
MyEvent{value=6}
MyEvent{value=6}
MyEvent{value=7}
MyEvent{value=7}
MyEvent{value=8}
MyEvent{value=8}
MyEvent{value=9}
MyEvent{value=9}

可以看出每個MyEventHandler(implements EventHandler)都會處理同一條消息。另外我們還可以使用類似:

disruptor.handleEventsWith(new MyEventHandler()).then(new MyEventHandler())

這樣的方法來定義依賴關系,比如先執行哪個handler再執行哪個handler。其他比如and()詳情見api
如果我們想定義多個handler,但是同時只有一個handler處理某一條消息。可以實現WorkHandler來定義handler:

import com.lmax.disruptor.WorkHandler;

/**
 * Created by hupeng on 2015/1/1.
 */
public class MyEventWorkHandler implements WorkHandler<MyEvent> {

    private String workerName;

    public MyEventWorkHandler(String workerName) {
        this.workerName = workerName;
    }

    @Override
    public void onEvent(MyEvent event) throws Exception {
        System.out.println(workerName + " handle event:" + event);
    }
}

這時候我們改一下我們的主程序:

public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        int bufferSize = 1024;

        Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(new MyEventFactory(),
                bufferSize, executorService, ProducerType.SINGLE, new YieldingWaitStrategy());
        disruptor.handleExceptionsWith(new IgnoreExceptionHandler());
        disruptor.handleEventsWithWorkerPool(new MyEventWorkHandler("worker-1"),new MyEventWorkHandler("worker-2"));
        RingBuffer<MyEvent> ringBuffer = disruptor.start();

        MyEventProducer producer = new MyEventProducer(ringBuffer);
        for (long i = 0; i < 10; i++) {
            producer.onData(i);
            Thread.sleep(1000);// wait for task execute....
        }

        disruptor.shutdown();

        ExecutorsUtils.shutdownAndAwaitTermination(executorService, 60, TimeUnit.SECONDS);

    }

這時候我們可以看到輸出是這樣的:

worker-1 handle event:MyEvent{value=0}
worker-2 handle event:MyEvent{value=1}
worker-1 handle event:MyEvent{value=2}
worker-2 handle event:MyEvent{value=3}
worker-1 handle event:MyEvent{value=4}
worker-2 handle event:MyEvent{value=5}
worker-1 handle event:MyEvent{value=6}
worker-2 handle event:MyEvent{value=7}
worker-1 handle event:MyEvent{value=8}
worker-2 handle event:MyEvent{value=9}

一條消息只被一個handler處理。

這里的ExecutorsUtils就是寫的一個關閉ExecutorService的方法

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class ExecutorsUtils {

    public static  void shutdownAndAwaitTermination(ExecutorService pool,int timeout,TimeUnit unit) {
        pool.shutdown(); // Disable new tasks from being submitted
        try {
            // Wait a while for existing tasks to terminate
            if (!pool.awaitTermination(timeout/2, unit)) {
                pool.shutdownNow(); // Cancel currently executing tasks
                // Wait a while for tasks to respond to being cancelled
                if (!pool.awaitTermination(timeout/2, unit))
                    System.err.println("Pool did not terminate");
            }
        } catch (InterruptedException ie) {
            // (Re-)Cancel if current thread also interrupted
            pool.shutdownNow();
            // Preserve interrupt status
            Thread.currentThread().interrupt();
        }
    }
}

概念部分來自http://ziyue1987.github.io/pages/2013/09/22/disruptor-use-manual.html ,如果想對這個框架有更一步了解,可以點進去看看,可以參考源代碼。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM