將系統性能優化到極致,永遠是程序愛好者所努力的一個方向。在java並發領域,也有很多的實踐與創新,小到樂觀鎖、CAS,大到netty線程模型、纖程Quasar、kilim等。Disruptor是一個輕量的高性能並發框架,以驚人的吞吐量而受到廣泛的關注。Disruptor為提高程序的並發性能,提供了很多新的思路,比如:
- 緩存行填充,消除偽共享;
- RingBuffer無鎖隊列設計;
- 預分配緩存對象,使用緩存的循環覆蓋取代緩存的新增刪除等;
下文將從源碼角度解析Disruptor的實現原理。
1 Disruptor術語
Disruptor有很多自身的概念,使得初學者看代碼會比較費勁。因此在深入Disruptor原理之前,需要先了解一下Disruptor主要的幾個核心類或接口。
- Sequence: 采用緩存行填充的方式對long類型的一層包裝,用以代表事件的序號。通過unsafe的cas方法從而避免了鎖的開銷;
- Sequencer: 生產者與緩存RingBuffer之間的橋梁。單生產者與多生產者分別對應於兩個實現SingleProducerSequencer與MultiProducerSequencer。Sequencer用於向RingBuffer申請空間,使用publish方法通過waitStrategy通知所有在等待可消費事件的SequenceBarrier;
- WaitStrategy: WaitStrategy有多種實現,用以表示當無可消費事件時,消費者的等待策略;
- SequenceBarrier: 消費者與緩存RingBuffer之間的橋梁。消費者並不直接訪問RingBuffer,從而能減少RingBuffer上的並發沖突;
- EventProcessor: 事件處理器,是消費者線程池Executor的調度單元,是對事件處理EventHandler與異常處理ExceptionHandler等的一層封裝;
- Event: 消費事件。Event的具體實現由用戶定義;
- RingBuffer: 基於數組的緩存實現,也是創建sequencer與定義WaitStrategy的入口;
- Disruptor: Disruptor的使用入口。持有RingBuffer、消費者線程池Executor、消費者集合ConsumerRepository等引用。
2 Disruptor源碼分析
2.1 Disruptor並發模型
並發領域的一個典型場景是生產者消費者模型,常規方式是使用queue作為生產者線程與消費者線程之間共享數據的方法,對於queue的讀寫避免不了讀寫鎖的競爭。Disruptor使用環形緩沖區RingBuffer作為共享數據的媒介。生產者通過Sequencer控制RingBuffer,以及喚醒等待事件的消費者,消費者通過SequenceBarrier監聽RingBuffer的可消費事件。考慮一個場景,一個生產者A與三個消費者B、C、D,同時D的事件處理需要B與C先完成。則該模型結構如下:
在這個結構下,每個消費者擁有各自獨立的事件序號Sequence,消費者之間不存在共享競態。SequenceBarrier1監聽RingBuffer的序號cursor,消費者B與C通過SequenceBarrier1等待可消費事件。SequenceBarrier2除了監聽cursor,同時也監聽B與C的序號Sequence,從而將最小的序號返回給消費者D,由此實現了D依賴B與C的邏輯。
RingBuffer是Disruptor高性能的一個亮點。RingBuffer就是一個大數組,事件以循環覆蓋的方式寫入。與常規RingBuffer擁有2個首尾指針的方式不同,Disruptor的RingBuffer只有一個指針(或稱序號),指向數組下一個可寫入的位置,該序號在Disruptor源碼中就是Sequencer中的cursor,由生產者通過Sequencer控制RingBuffer的寫入。為了避免未消費事件的寫入覆蓋,Sequencer需要監聽所有消費者的消息處理進度,也就是gatingSequences。RingBuffer通過這種方式實現了事件緩存的無鎖設計。
下面將通過分析源碼,來理解Disruptor的實現原理。
2.2 Disruptor類
Disruptor類是Disruptor框架的總入口,能用DSL的形式組織消費者之間的關系鏈,並提供獲取事件、發布事件等方法。它包含以下屬性:
private final RingBuffer<T> ringBuffer;
/**消費者事件處理線程池**/
private final Executor executor;
/**消費者集合**/
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
/**Disruptor是否啟動標示,只能啟動一次**/
private final AtomicBoolean started = new AtomicBoolean(false);
/**消費者事件異常處理方法**/
private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<T>();
實例化Disruptor的過程,就是實例化RingBuffer與消費線程池Executor的過程。除此之外,Disruptor類最重要的作用是注冊消費者,handleEventsWith方法。該方法有多套實現,而每一個消費者最終都會被包裝成EventProcessor。createEventProcessors是包裝消費者的重要函數。
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
final EventHandler<T>[] eventHandlers)
{
checkNotStarted();
//每個消費者有自己的事件序號Sequence
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
//消費者通過SequenceBarrier等待可消費事件
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<T> eventHandler = eventHandlers[i];
//每個消費者都以BatchEventProcessor被調度
final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
if (processorSequences.length > 0)
{
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}
從程序中可以看出,每個消費者都以BatchEventProcessor的形式被調度,也就是說,消費者的邏輯都在BatchEventProcessor。
2.3 EventProcessor
EventProcessor有兩個有操作邏輯的實現類,BatchEventProcessor與WorkProcessor,處理邏輯很相近,這邊僅分析BatchEventProcessor。
BatchEventProcessor的構造函數使用DataProvider,而不直接使用RingBuffer,可能是Disruptor考慮到留給用戶替換RingBuffer事件存儲的空間,畢竟RingBuffer是內存級的。
Disruptor啟動時,會調用每個消費者ConsumerInfo(在消費者集合ConsumerRepository中)的start方法,最終會運行到BatchEventProcessor的run方法。
@Override
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
// sequence.get()標示當前已經處理的序號
long nextSequence = sequence.get() + 1L;
try
{
while (true)
{
try
{
// sequenceBarrier最重要的作用,就是讓消費者等待下一個可用的序號
// 可用序號可能會大於nextSequence,從而消費者可以一次處理多個事件
// 如果該消費者同時也依賴了其他消費者,則會返回最小的那個
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (nextSequence > availableSequence)
{
Thread.yield();
}
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
// eventHandler是用戶定義的事件消費邏輯
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
// 跟蹤自己處理的事件
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}
消費者的邏輯,就是在while循環中,不斷查詢可消費事件,並由用戶自定義的消費邏輯eventHandler進行處理。查詢可消費事件的邏輯在SequenceBarrier中。
2.4 SequenceBarrier
SequenceBarrier只有一個實現,ProcessingSequenceBarrier。下面是ProcessingSequenceBarrier的構造函數。
public ProcessingSequenceBarrier(final Sequencer sequencer,final WaitStrategy waitStrategy,final Sequence cursorSequence,final Sequence[] dependentSequences)
{
// 生產者的ringBuffer控制器sequencer
this.sequencer = sequencer;
// 消費者等待可消費事件的策略
this.waitStrategy = waitStrategy;
// ringBuffer的cursor
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length)
{
dependentSequence = cursorSequence;
}
else
{
// 當依賴其他消費者時,dependentSequence就是其他消費者的序號
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
消費者通過ProcessingSequenceBarrier的waitFor方法等待可消費序號,實際是調用WaitStrategy的waitFor方法。
2.5 WaitStrategy
WaitStrategy有6個實現類,用於代表6種不同的等待策略,比如阻塞策略、忙等策略等。這邊就僅分析一個阻塞策略BlockingWaitStrategy。
@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
if ((availableSequence = cursorSequence.get()) < sequence)
{
lock.lock();
try
{
// 如果ringBuffer的cursor小於需要的序號,也就是生產者沒有新的事件發出,則阻塞消費者線程,直到生產者通過Sequencer的publish方法喚醒消費者。
while ((availableSequence = cursorSequence.get()) < sequence)
{
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally
{
lock.unlock();
}
}
// 如果生產者新發布了事件,但是依賴的其他消費者還沒處理完,則等待所依賴的消費者先處理。在本文的例子中,就是等B與C先處理完,D才能處理事件。
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
}
return availableSequence;
}
到這里,消費者的程序邏輯也就基本都清楚了。最后再看一下生產者的程序邏輯,主要是Sequencer。
2.6 Sequencer
Sequencer負責生產者對RingBuffer的控制,包括查詢是否有寫入空間、申請空間、發布事件並喚醒消費者等。Sequencer有兩個實現SingleProducerSequencer與MultiProducerSequencer,分別對應於單生產者模型與多生產者模型。只要看懂hasAvailableCapacity(),申請空間也就明白了。下面是SingleProducerSequencer的hasAvailableCapacity實現。
@Override
public boolean hasAvailableCapacity(final int requiredCapacity)
{
long nextValue = pad.nextValue;
// wrapPoint是一個臨界序號,必須比當前最小的未消費序號還小
long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
// 當前的最小未消費序號
long cachedGatingSequence = pad.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
pad.cachedValue = minSequence;
if (wrapPoint > minSequence)
{
return false;
}
}
return true;
}
3 Disruptor實例
本實例基於3.2.0版本的Disruptor,實現2.1小結描述的並發場景。使用Disruptor的過程非常簡單,只需要簡單的幾步。
定義用戶事件:
public class MyEvent {
private long value;
public MyEvent(){}
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
定義事件工廠,這是實例化Disruptor所需要的:
public class MyEventFactory implements EventFactory<MyEvent> {
public MyEvent newInstance() {
return new MyEvent();
}
}
定義消費者B、C、D:
public class MyEventHandlerB implements EventHandler<MyEvent> {
public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
System.out.println("Comsume Event B : " + myEvent.getValue());
}
}
public class MyEventHandlerC implements EventHandler<MyEvent> {
public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
System.out.println("Comsume Event C : " + myEvent.getValue());
}
}
public class MyEventHandlerD implements EventHandler<MyEvent> {
public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
System.out.println("Comsume Event D : " + myEvent.getValue());
}
}
在此基礎上,就可以運行Disruptor了:
public static void main(String[] args){
EventFactory<MyEvent> myEventFactory = new MyEventFactory();
Executor executor = Executors.newCachedThreadPool();
int ringBufferSize = 32;
Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(myEventFactory,ringBufferSize,executor, ProducerType.SINGLE,new BlockingWaitStrategy());
EventHandler<MyEvent> b = new MyEventHandlerB();
EventHandler<MyEvent> c = new MyEventHandlerC();
EventHandler<MyEvent> d = new MyEventHandlerD();
SequenceBarrier sequenceBarrier2 = disruptor.handleEventsWith(b,c).asSequenceBarrier();
BatchEventProcessor processord = new BatchEventProcessor(disruptor.getRingBuffer(),sequenceBarrier2,d);
disruptor.handleEventsWith(processord);
// disruptor.after(b,c).handleEventsWith(d); // 此行能代替上兩行的程序邏輯
RingBuffer<MyEvent> ringBuffer = disruptor.start(); // 啟動Disruptor
for(int i=0; i<10; i++) {
long sequence = ringBuffer.next(); // 申請位置
try {
MyEvent myEvent = ringBuffer.get(sequence);
myEvent.setValue(i); // 放置數據
} finally {
ringBuffer.publish(sequence); // 提交,如果不提交完成事件會一直阻塞
}
try{
Thread.sleep(100);
}catch (Exception e){
}
}
disruptor.shutdown();
}
按照程序的邏輯,B與C會率先處理ringBuffer中的事件,且處理順序不分先后。同一事件被B與C處理完成之后,才會被D處理,結果如下:
Comsume Event C : 0
Comsume Event B : 0
Comsume Event D : 0
Comsume Event C : 1
Comsume Event B : 1
Comsume Event D : 1
Comsume Event C : 2
Comsume Event B : 2
Comsume Event D : 2
Comsume Event C : 3
Comsume Event B : 3
Comsume Event D : 3
Comsume Event C : 4
Comsume Event B : 4
Comsume Event D : 4
Comsume Event C : 5
Comsume Event B : 5
Comsume Event D : 5
Comsume Event C : 6
Comsume Event B : 6
Comsume Event D : 6
Comsume Event C : 7
Comsume Event B : 7
Comsume Event D : 7
Comsume Event C : 8
Comsume Event B : 8
Comsume Event D : 8
Comsume Event C : 9
Comsume Event B : 9
Comsume Event D : 9
將本例中的Thread.sleep去掉,即可以觀察到B與C的處理不分先后,結果符合預期。
本文乃作者原創,轉載請注明出處。http://www.cnblogs.com/miao-rui/p/6379473.html