Disruptor深入解讀


將系統性能優化到極致,永遠是程序愛好者所努力的一個方向。在java並發領域,也有很多的實踐與創新,小到樂觀鎖、CAS,大到netty線程模型、纖程Quasar、kilim等。Disruptor是一個輕量的高性能並發框架,以驚人的吞吐量而受到廣泛的關注。Disruptor為提高程序的並發性能,提供了很多新的思路,比如:

  1. 緩存行填充,消除偽共享;
  2. RingBuffer無鎖隊列設計;
  3. 預分配緩存對象,使用緩存的循環覆蓋取代緩存的新增刪除等;

下文將從源碼角度解析Disruptor的實現原理。

1 Disruptor術語

Disruptor有很多自身的概念,使得初學者看代碼會比較費勁。因此在深入Disruptor原理之前,需要先了解一下Disruptor主要的幾個核心類或接口。

  • Sequence: 采用緩存行填充的方式對long類型的一層包裝,用以代表事件的序號。通過unsafe的cas方法從而避免了鎖的開銷;
  • Sequencer: 生產者與緩存RingBuffer之間的橋梁。單生產者與多生產者分別對應於兩個實現SingleProducerSequencer與MultiProducerSequencer。Sequencer用於向RingBuffer申請空間,使用publish方法通過waitStrategy通知所有在等待可消費事件的SequenceBarrier;
  • WaitStrategy: WaitStrategy有多種實現,用以表示當無可消費事件時,消費者的等待策略;
  • SequenceBarrier: 消費者與緩存RingBuffer之間的橋梁。消費者並不直接訪問RingBuffer,從而能減少RingBuffer上的並發沖突;
  • EventProcessor: 事件處理器,是消費者線程池Executor的調度單元,是對事件處理EventHandler與異常處理ExceptionHandler等的一層封裝;
  • Event: 消費事件。Event的具體實現由用戶定義;
  • RingBuffer: 基於數組的緩存實現,也是創建sequencer與定義WaitStrategy的入口;
  • Disruptor: Disruptor的使用入口。持有RingBuffer、消費者線程池Executor、消費者集合ConsumerRepository等引用。

2 Disruptor源碼分析

2.1 Disruptor並發模型

並發領域的一個典型場景是生產者消費者模型,常規方式是使用queue作為生產者線程與消費者線程之間共享數據的方法,對於queue的讀寫避免不了讀寫鎖的競爭。Disruptor使用環形緩沖區RingBuffer作為共享數據的媒介。生產者通過Sequencer控制RingBuffer,以及喚醒等待事件的消費者,消費者通過SequenceBarrier監聽RingBuffer的可消費事件。考慮一個場景,一個生產者A與三個消費者B、C、D,同時D的事件處理需要B與C先完成。則該模型結構如下:

在這個結構下,每個消費者擁有各自獨立的事件序號Sequence,消費者之間不存在共享競態。SequenceBarrier1監聽RingBuffer的序號cursor,消費者B與C通過SequenceBarrier1等待可消費事件。SequenceBarrier2除了監聽cursor,同時也監聽B與C的序號Sequence,從而將最小的序號返回給消費者D,由此實現了D依賴B與C的邏輯。
RingBuffer是Disruptor高性能的一個亮點。RingBuffer就是一個大數組,事件以循環覆蓋的方式寫入。與常規RingBuffer擁有2個首尾指針的方式不同,Disruptor的RingBuffer只有一個指針(或稱序號),指向數組下一個可寫入的位置,該序號在Disruptor源碼中就是Sequencer中的cursor,由生產者通過Sequencer控制RingBuffer的寫入。為了避免未消費事件的寫入覆蓋,Sequencer需要監聽所有消費者的消息處理進度,也就是gatingSequences。RingBuffer通過這種方式實現了事件緩存的無鎖設計。
下面將通過分析源碼,來理解Disruptor的實現原理。

2.2 Disruptor類

Disruptor類是Disruptor框架的總入口,能用DSL的形式組織消費者之間的關系鏈,並提供獲取事件、發布事件等方法。它包含以下屬性:

private final RingBuffer<T> ringBuffer;
/**消費者事件處理線程池**/
private final Executor executor;
/**消費者集合**/
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
/**Disruptor是否啟動標示,只能啟動一次**/
private final AtomicBoolean started = new AtomicBoolean(false);
/**消費者事件異常處理方法**/
private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<T>();

實例化Disruptor的過程,就是實例化RingBuffer與消費線程池Executor的過程。除此之外,Disruptor類最重要的作用是注冊消費者,handleEventsWith方法。該方法有多套實現,而每一個消費者最終都會被包裝成EventProcessor。createEventProcessors是包裝消費者的重要函數。

EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
                                           final EventHandler<T>[] eventHandlers)
{
	checkNotStarted();
	//每個消費者有自己的事件序號Sequence
	final Sequence[] processorSequences = new Sequence[eventHandlers.length];   
	//消費者通過SequenceBarrier等待可消費事件
	final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);   	for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
	{
		final EventHandler<T> eventHandler = eventHandlers[i];
		//每個消費者都以BatchEventProcessor被調度
		final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);  
		if (exceptionHandler != null)
		{
			batchEventProcessor.setExceptionHandler(exceptionHandler);
		}
		consumerRepository.add(batchEventProcessor, eventHandler, barrier);
		processorSequences[i] = batchEventProcessor.getSequence();
	}
	
	if (processorSequences.length > 0)
	{
		consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
	}
	
	return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}

從程序中可以看出,每個消費者都以BatchEventProcessor的形式被調度,也就是說,消費者的邏輯都在BatchEventProcessor。

2.3 EventProcessor

EventProcessor有兩個有操作邏輯的實現類,BatchEventProcessor與WorkProcessor,處理邏輯很相近,這邊僅分析BatchEventProcessor。
BatchEventProcessor的構造函數使用DataProvider,而不直接使用RingBuffer,可能是Disruptor考慮到留給用戶替換RingBuffer事件存儲的空間,畢竟RingBuffer是內存級的。
Disruptor啟動時,會調用每個消費者ConsumerInfo(在消費者集合ConsumerRepository中)的start方法,最終會運行到BatchEventProcessor的run方法。

@Override
public void run()
{
	if (!running.compareAndSet(false, true))
	{
		throw new IllegalStateException("Thread is already running");
	}
	sequenceBarrier.clearAlert();
	
	notifyStart();
	
	T event = null;
	// sequence.get()標示當前已經處理的序號
	long nextSequence = sequence.get() + 1L;
	try
	{
		while (true)
		{
			try
			{
				// sequenceBarrier最重要的作用,就是讓消費者等待下一個可用的序號
				// 可用序號可能會大於nextSequence,從而消費者可以一次處理多個事件
				// 如果該消費者同時也依賴了其他消費者,則會返回最小的那個
				final long availableSequence = sequenceBarrier.waitFor(nextSequence);
				if (nextSequence > availableSequence)
				{
					Thread.yield();
				}
				
				while (nextSequence <= availableSequence)
				{
					event = dataProvider.get(nextSequence);
					// eventHandler是用戶定義的事件消費邏輯
					eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
					nextSequence++;
				}
				
				// 跟蹤自己處理的事件
				sequence.set(availableSequence);
			}
			catch (final TimeoutException e)
			{
				notifyTimeout(sequence.get());
			}
			catch (final AlertException ex)
			{
				if (!running.get())
				{
					break;
				}
			}
			catch (final Throwable ex)
			{
				exceptionHandler.handleEventException(ex, nextSequence, event);
				sequence.set(nextSequence);
				nextSequence++;
			}
		}
	}
	finally
	{
		notifyShutdown();
		running.set(false);
	}
}

消費者的邏輯,就是在while循環中,不斷查詢可消費事件,並由用戶自定義的消費邏輯eventHandler進行處理。查詢可消費事件的邏輯在SequenceBarrier中。

2.4 SequenceBarrier

SequenceBarrier只有一個實現,ProcessingSequenceBarrier。下面是ProcessingSequenceBarrier的構造函數。

public ProcessingSequenceBarrier(final Sequencer sequencer,final WaitStrategy waitStrategy,final Sequence cursorSequence,final Sequence[] dependentSequences)
{
	// 生產者的ringBuffer控制器sequencer
	this.sequencer = sequencer;
	// 消費者等待可消費事件的策略
	this.waitStrategy = waitStrategy;
	// ringBuffer的cursor
	this.cursorSequence = cursorSequence;
	if (0 == dependentSequences.length)
	{
		dependentSequence = cursorSequence;
	}
	else
	{
	// 當依賴其他消費者時,dependentSequence就是其他消費者的序號
		dependentSequence = new FixedSequenceGroup(dependentSequences);
	}
}

消費者通過ProcessingSequenceBarrier的waitFor方法等待可消費序號,實際是調用WaitStrategy的waitFor方法。

2.5 WaitStrategy

WaitStrategy有6個實現類,用於代表6種不同的等待策略,比如阻塞策略、忙等策略等。這邊就僅分析一個阻塞策略BlockingWaitStrategy。

@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
	throws AlertException, InterruptedException
{
	long availableSequence;
	if ((availableSequence = cursorSequence.get()) < sequence)
	{
		lock.lock();
		try
		{
			// 如果ringBuffer的cursor小於需要的序號,也就是生產者沒有新的事件發出,則阻塞消費者線程,直到生產者通過Sequencer的publish方法喚醒消費者。
			while ((availableSequence = cursorSequence.get()) < sequence)
			{
				barrier.checkAlert();
				processorNotifyCondition.await();
			}
		}
		finally
		{
			lock.unlock();
		}
	}
	
	// 如果生產者新發布了事件,但是依賴的其他消費者還沒處理完,則等待所依賴的消費者先處理。在本文的例子中,就是等B與C先處理完,D才能處理事件。
	while ((availableSequence = dependentSequence.get()) < sequence)
	{
		barrier.checkAlert();
	}
	
	return availableSequence;
}

到這里,消費者的程序邏輯也就基本都清楚了。最后再看一下生產者的程序邏輯,主要是Sequencer。

2.6 Sequencer

Sequencer負責生產者對RingBuffer的控制,包括查詢是否有寫入空間、申請空間、發布事件並喚醒消費者等。Sequencer有兩個實現SingleProducerSequencer與MultiProducerSequencer,分別對應於單生產者模型與多生產者模型。只要看懂hasAvailableCapacity(),申請空間也就明白了。下面是SingleProducerSequencer的hasAvailableCapacity實現。

@Override
public boolean hasAvailableCapacity(final int requiredCapacity)
{
	long nextValue = pad.nextValue;
	// wrapPoint是一個臨界序號,必須比當前最小的未消費序號還小
	long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
	// 當前的最小未消費序號
	long cachedGatingSequence = pad.cachedValue;
	
	if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
	{
		long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
		pad.cachedValue = minSequence;
		
		if (wrapPoint > minSequence)
		{
			return false;
		}
	}
	return true;
}

3 Disruptor實例

本實例基於3.2.0版本的Disruptor,實現2.1小結描述的並發場景。使用Disruptor的過程非常簡單,只需要簡單的幾步。
定義用戶事件:

public class MyEvent {
	private long value;
	
	public MyEvent(){}
	
	public long getValue() {
		return value;
	}
	
	public void setValue(long value) {
		this.value = value;
	}
}

定義事件工廠,這是實例化Disruptor所需要的:

public class MyEventFactory implements EventFactory<MyEvent> {
	public MyEvent newInstance() {
		return new MyEvent();
	}
}

定義消費者B、C、D:

public class MyEventHandlerB implements EventHandler<MyEvent> {
	public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
		System.out.println("Comsume Event B : " + myEvent.getValue());
	}
}

public class MyEventHandlerC implements EventHandler<MyEvent> {
	public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
		System.out.println("Comsume Event C : " + myEvent.getValue());
	}
}

public class MyEventHandlerD implements EventHandler<MyEvent> {
	public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
		System.out.println("Comsume Event D : " + myEvent.getValue());
	}
}

在此基礎上,就可以運行Disruptor了:

public static void main(String[] args){
	EventFactory<MyEvent> myEventFactory = new MyEventFactory();
	Executor executor = Executors.newCachedThreadPool();
	int ringBufferSize = 32;
	
	Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(myEventFactory,ringBufferSize,executor, ProducerType.SINGLE,new BlockingWaitStrategy());
	EventHandler<MyEvent> b = new MyEventHandlerB();
	EventHandler<MyEvent> c = new MyEventHandlerC();
	EventHandler<MyEvent> d = new MyEventHandlerD();
	
	SequenceBarrier sequenceBarrier2 = disruptor.handleEventsWith(b,c).asSequenceBarrier();
	BatchEventProcessor processord = new BatchEventProcessor(disruptor.getRingBuffer(),sequenceBarrier2,d);
	disruptor.handleEventsWith(processord);
//  disruptor.after(b,c).handleEventsWith(d);              // 此行能代替上兩行的程序邏輯
	RingBuffer<MyEvent> ringBuffer = disruptor.start();    // 啟動Disruptor
	for(int i=0; i<10; i++) {
		long sequence = ringBuffer.next();                 // 申請位置
		try {
			MyEvent myEvent = ringBuffer.get(sequence);
			myEvent.setValue(i);                           // 放置數據
		} finally {
			ringBuffer.publish(sequence);                  // 提交,如果不提交完成事件會一直阻塞
		}
		try{
			Thread.sleep(100);
		}catch (Exception e){
		}
	}
	disruptor.shutdown();
}

按照程序的邏輯,B與C會率先處理ringBuffer中的事件,且處理順序不分先后。同一事件被B與C處理完成之后,才會被D處理,結果如下:

Comsume Event C : 0
Comsume Event B : 0
Comsume Event D : 0
Comsume Event C : 1
Comsume Event B : 1
Comsume Event D : 1
Comsume Event C : 2
Comsume Event B : 2
Comsume Event D : 2
Comsume Event C : 3
Comsume Event B : 3
Comsume Event D : 3
Comsume Event C : 4
Comsume Event B : 4
Comsume Event D : 4
Comsume Event C : 5
Comsume Event B : 5
Comsume Event D : 5
Comsume Event C : 6
Comsume Event B : 6
Comsume Event D : 6
Comsume Event C : 7
Comsume Event B : 7
Comsume Event D : 7
Comsume Event C : 8
Comsume Event B : 8
Comsume Event D : 8
Comsume Event C : 9
Comsume Event B : 9
Comsume Event D : 9

將本例中的Thread.sleep去掉,即可以觀察到B與C的處理不分先后,結果符合預期。

本文乃作者原創,轉載請注明出處。http://www.cnblogs.com/miao-rui/p/6379473.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM