..................2015年的第一天...................
本文代碼托管在 https://github.com/hupengcool/disruptor-starter
Intruduction
關於吹牛逼的話就不說了。。。Disruptor是Java實現的用於線程間通信的消息組件。其核心是一個Lock-free的Ringbuffer,Disruptor使用CAS而不是Lock。與大部分並發隊列使用的Lock相比,CAS顯然要快很多。CAS是CPU級別的指令,更加輕量,不需要像Lock一樣需要OS的支持,所以每次調用不需要kernel entry,也不需要context switch。當然,使用CAS的代價是Disruptor實現的復雜程度也相對提高了。
Component
Sequence
Sequence是Disruptor最核心的組件,上面已經提到過了。生產者對RingBuffer的互斥訪問,生產者與消費者之間的協調以及消費者之間的協調,都是通過Sequence實現。幾乎每一個重要的組件都包含Sequence。那么Sequence是什么呢?首先Sequence是一個遞增的序號,說白了就是計數器;其次,由於需要在線程間共享,所以Sequence是引用傳遞,並且是線程安全的;再次,Sequence支持CAS操作;最后,為了提高效率,Sequence通過padding來避免偽共享。
RingBuffer
RingBuffer是存儲消息的地方,通過一個名為cursor的Sequence對象指示隊列的頭,協調多個生產者向RingBuffer中添加消息,並用於在消費者端判斷RingBuffer是否為空。巧妙的是,表示隊列尾的Sequence並沒有在RingBuffer中,而是由消費者維護。這樣的好處是多個消費者處理消息的方式更加靈活,可以在一個RingBuffer上實現消息的單播,多播,流水線以及它們的組合。其缺點是在生產者端判斷RingBuffer是否已滿是需要跟蹤更多的信息,為此,在RingBuffer中維護了一個名為gatingSequences的Sequence數組來跟蹤相關Seqence。
SequenceBarrier
SequenceBarrier用來在消費者之間以及消費者和RingBuffer之間建立依賴關系。在Disruptor中,依賴關系實際上指的是Sequence的大小關系,消費者A依賴於消費者B指的是消費者A的Sequence一定要小於等於消費者B的Sequence,這種大小關系決定了處理某個消息的先后順序。因為所有消費者都依賴於RingBuffer,所以消費者的Sequence一定小於等於RingBuffer中名為cursor的Sequence,即消息一定是先被生產者放到Ringbuffer中,然后才能被消費者處理。
SequenceBarrier在初始化的時候會收集需要依賴的組件的Sequence,RingBuffer的cursor會被自動的加入其中。需要依賴其他消費者和/或RingBuffer的消費者在消費下一個消息時,會先等待在SequenceBarrier上,直到所有被依賴的消費者和RingBuffer的Sequence大於等於這個消費者的Sequence。當被依賴的消費者或RingBuffer的Sequence有變化時,會通知SequenceBarrier喚醒等待在它上面的消費者。
WaitStrategy
當消費者等待在SequenceBarrier上時,有許多可選的等待策略,不同的等待策略在延遲和CPU資源的占用上有所不同,可以視應用場景選擇:
BusySpinWaitStrategy : 自旋等待,類似Linux Kernel使用的自旋鎖。低延遲但同時對CPU資源的占用也多。
BlockingWaitStrategy : 使用鎖和條件變量。CPU資源的占用少,延遲大。
SleepingWaitStrategy : 在多次循環嘗試不成功后,選擇讓出CPU,等待下次調度,多次調度后仍不成功,嘗試前睡眠一個納秒級別的時間再嘗試。這種策略平衡了延遲和CPU資源占用,但延遲不均勻。
YieldingWaitStrategy : 在多次循環嘗試不成功后,選擇讓出CPU,等待下次調。平衡了延遲和CPU資源占用,但延遲也比較均勻。
PhasedBackoffWaitStrategy : 上面多種策略的綜合,CPU資源的占用少,延遲大。
BatchEvenProcessor
在Disruptor中,消費者是以EventProcessor的形式存在的。其中一類消費者是BatchEvenProcessor。每個BatchEvenProcessor有一個Sequence,來記錄自己消費RingBuffer中消息的情況。所以,一個消息必然會被每一個BatchEvenProcessor消費。
WorkProcessor
另一類消費者是WorkProcessor。每個WorkProcessor也有一個Sequence,多個WorkProcessor還共享一個Sequence用於互斥的訪問RingBuffer。一個消息被一個WorkProcessor消費,就不會被共享一個Sequence的其他WorkProcessor消費。這個被WorkProcessor共享的Sequence相當於尾指針。
WorkerPool
共享同一個Sequence的WorkProcessor可由一個WorkerPool管理,這時,共享的Sequence也由WorkerPool創建。
Use Cases
下面以Disruptor 3.3.0版本為例介紹Disruptor的初級使用,本文並沒有用那些比較原始的API,如果想知道上面寫的一些api如何使用,可以參考 https://github.com/LMAX-Exchange/disruptor/tree/master/src/perftest/java/com/lmax/disruptor 為了簡化使用,框架提供Disruptor類來簡化使用,下面主要是使用這個類來演示。
首先定義一個Event:
/**
* Created by hupeng on 2015/1/1.
*/
public class MyEvent {
private long value;
public void setValue(long value) {
this.value = value;
}
@Override
public String toString() {
return "MyEvent{" +
"value=" + value +
'}';
}
}
然后提供一個EventFactory,RingBuffer通過這factory來初始化在Event。
import com.lmax.disruptor.EventFactory;
/**
* Created by hupeng on 2015/1/1.
*/
public class MyEventFactory implements EventFactory<MyEvent> {
@Override
public MyEvent newInstance() {
return new MyEvent();
}
}
然后寫一個Producer類,也就是消息的生產者。
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
/**
* Created by hupeng on 2015/1/1.
*/
public class MyEventProducer {
private RingBuffer<MyEvent> ringBuffer;
public MyEventProducer(RingBuffer<MyEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
private static final EventTranslatorOneArg TRANSLATOR = new EventTranslatorOneArg<MyEvent, Long>() {
@Override
public void translateTo(MyEvent event, long sequence, Long value) {
event.setValue(value);
}
};
public void onData(final Long value) {
ringBuffer.publishEvent(TRANSLATOR,value);
}
}
然后寫一個EventHandler。這個就是我們定義怎么處理消息的地方。
import com.lmax.disruptor.EventHandler;
/**
* Created by hupeng on 2015/1/1.
*/
public class MyEventHandler implements EventHandler<MyEvent> {
@Override
public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println(event);
}
}
主程序:
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import disruptor.starter.support.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class MyEventMain {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
int bufferSize = 1024;
Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(new MyEventFactory(),
bufferSize, executorService, ProducerType.SINGLE, new YieldingWaitStrategy());
disruptor.handleExceptionsWith(new IgnoreExceptionHandler());
disruptor.handleEventsWith(new MyEventHandler(),new MyEventHandler());
// disruptor.handleEventsWith(new MyEventHandler()).then(new MyEventHandler()); //Pipeline
RingBuffer<MyEvent> ringBuffer = disruptor.start();
MyEventProducer producer = new MyEventProducer(ringBuffer);
for (long i = 0; i < 10; i++) {
producer.onData(i);
Thread.sleep(1000);// wait for task execute....
}
disruptor.shutdown();
ExecutorsUtils.shutdownAndAwaitTermination(executorService, 60, TimeUnit.SECONDS);
}
}
在這個例子中輸出
MyEvent{value=0}
MyEvent{value=0}
MyEvent{value=1}
MyEvent{value=1}
MyEvent{value=2}
MyEvent{value=2}
MyEvent{value=3}
MyEvent{value=3}
MyEvent{value=4}
MyEvent{value=4}
MyEvent{value=5}
MyEvent{value=5}
MyEvent{value=6}
MyEvent{value=6}
MyEvent{value=7}
MyEvent{value=7}
MyEvent{value=8}
MyEvent{value=8}
MyEvent{value=9}
MyEvent{value=9}
可以看出每個MyEventHandler(implements EventHandler)都會處理同一條消息。另外我們還可以使用類似:
disruptor.handleEventsWith(new MyEventHandler()).then(new MyEventHandler())
這樣的方法來定義依賴關系,比如先執行哪個handler再執行哪個handler。其他比如and()詳情見api
如果我們想定義多個handler,但是同時只有一個handler處理某一條消息。可以實現WorkHandler來定義handler:
import com.lmax.disruptor.WorkHandler;
/**
* Created by hupeng on 2015/1/1.
*/
public class MyEventWorkHandler implements WorkHandler<MyEvent> {
private String workerName;
public MyEventWorkHandler(String workerName) {
this.workerName = workerName;
}
@Override
public void onEvent(MyEvent event) throws Exception {
System.out.println(workerName + " handle event:" + event);
}
}
這時候我們改一下我們的主程序:
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
int bufferSize = 1024;
Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(new MyEventFactory(),
bufferSize, executorService, ProducerType.SINGLE, new YieldingWaitStrategy());
disruptor.handleExceptionsWith(new IgnoreExceptionHandler());
disruptor.handleEventsWithWorkerPool(new MyEventWorkHandler("worker-1"),new MyEventWorkHandler("worker-2"));
RingBuffer<MyEvent> ringBuffer = disruptor.start();
MyEventProducer producer = new MyEventProducer(ringBuffer);
for (long i = 0; i < 10; i++) {
producer.onData(i);
Thread.sleep(1000);// wait for task execute....
}
disruptor.shutdown();
ExecutorsUtils.shutdownAndAwaitTermination(executorService, 60, TimeUnit.SECONDS);
}
這時候我們可以看到輸出是這樣的:
worker-1 handle event:MyEvent{value=0}
worker-2 handle event:MyEvent{value=1}
worker-1 handle event:MyEvent{value=2}
worker-2 handle event:MyEvent{value=3}
worker-1 handle event:MyEvent{value=4}
worker-2 handle event:MyEvent{value=5}
worker-1 handle event:MyEvent{value=6}
worker-2 handle event:MyEvent{value=7}
worker-1 handle event:MyEvent{value=8}
worker-2 handle event:MyEvent{value=9}
一條消息只被一個handler處理。
這里的ExecutorsUtils就是寫的一個關閉ExecutorService的方法
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
public class ExecutorsUtils {
public static void shutdownAndAwaitTermination(ExecutorService pool,int timeout,TimeUnit unit) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(timeout/2, unit)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(timeout/2, unit))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}
概念部分來自http://ziyue1987.github.io/pages/2013/09/22/disruptor-use-manual.html ,如果想對這個框架有更一步了解,可以點進去看看,可以參考源代碼。