消費者如何讀取數據?
前一篇是生產者的處理,這一篇講消費者的處理
我們都知道,消費者無非就是不停地從隊列中讀取數據,處理數據。但是與BlockedQueue不同的是,RingBuffer的消費者不會對隊列進行上鎖,那它是怎樣實現的呢?
概括地說,就是通過CAS原子性地得到一個可消費的序號,然后再根據序號取出數據進行處理。
在看代碼之前,我們先把能想到的東西先羅列一下:
1.需要一個尾指針來追蹤消費狀態
2.如何防止一個數據被多個消費者重復消費?
3.消費速度不能超過生產者,如何限制?
4.當沒有可處理數據的時候消費者該做什么,自旋還是掛起等待生產者喚醒?
5.如果4選擇掛起,那么如果RingBuffer關閉,如何喚醒消費者以終結線程任務?
6.RingBuffer構造的時候需要傳入線程工廠,RingBuffer是如何使用線程的,多個任務使用一個線程調度?
7.消費者何時啟動?
好,問題有了,現在我們來看代碼,下面是EventProcessor的一個實現,WorkProcessor的部分代碼。
public final class WorkProcessor<T> implements EventProcessor { private final AtomicBoolean running = new AtomicBoolean(false); //當前處理器狀態 private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //當前已消費過的最新序號 private final RingBuffer<T> ringBuffer; //保留此引用,方便取數據 private final SequenceBarrier sequenceBarrier; //用於等待下一個最大可用序號,可與多個Processor共用 private final WorkHandler<? super T> workHandler; //實際上的處理器 private final ExceptionHandler<? super T> exceptionHandler; private final Sequence workSequence; //多個Processor共用的workSequence,可以得到下一個待處理的序號 //.... @Override public void run() { if (!running.compareAndSet(false, true)) //防止run方法重復調用造成的問題 { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); boolean processedSequence = true; long cachedAvailableSequence = Long.MIN_VALUE; long nextSequence = sequence.get(); T event = null; while (true) //死循環 { try { if (processedSequence) { if (!running.get()) //如果檢查到已關閉,則喚醒在同一個Barrier上的其他processor線程 { sequenceBarrier.alert(); //喚醒其他線程 sequenceBarrier.checkAlert(); //拋出異常,終結此線程 } processedSequence = false; do { //workSequence可能和多個Processor共用 nextSequence = workSequence.get() + 1L; //這個sequence才是當前處理器處理過的序號,生產者判斷尾指針的時候就是按照這個來的,這個就是gatingSequence //拿到下一個新序號的時候,說明workSequence前一個數據已經處理過了 sequence.set(nextSequence - 1L); } //由於workSequence可能由多個Processor共用,故存在競爭情況,需要使用CAS while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); } //如果沒有超過上一次緩存生產者的最大序號,則表明數據可取 if (cachedAvailableSequence >= nextSequence) { //取出序號對應位置的數據 event = ringBuffer.get(nextSequence); //交給handler處理 workHandler.onEvent(event); processedSequence = true; } else { //阻塞等待下一個可用的序號 //如果就是nextSequence,就返回nextSequence //如果可用的大於nextSequence,則返回最新可用的sequence cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) //checkAlert()拋出的 { if (!running.get()) //如果已經結束,則終結循環,線程任務結束 { break; } } catch (final Throwable ex) //其他異常,則交給異常處理器處理 { // handle, mark as processed, unless the exception handler threw an exception exceptionHandler.handleEventException(ex, nextSequence, event); processedSequence = true; } } notifyShutdown(); running.set(false); } //... }
針對問題一:需要一個尾指針來追蹤消費狀態
你們注意到代碼中有兩個Sequence,workSequence和sequence。為啥需要兩個呢?
workSequence消費者使用的最新序號(該序號的數據未被處理過,只是被消費者標記成可消費);而sequence序號的數據則是被消費過的,這個序號正是前一篇中的gatingSequence。

針對問題二:如何防止一個數據被多個消費者重復消費?
問題二的解決方案就是WorkPool,即讓多個WorkProcessor共用一個workSequence,這樣它們就會競爭序號,一個序號只能被消費一次。

public final class WorkerPool<T> { private final AtomicBoolean started = new AtomicBoolean(false); private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //從-1開始 private final RingBuffer<T> ringBuffer; //RingBuffer引用,用於構造Processor,取數據 private final WorkProcessor<?>[] workProcessors; //... public WorkerPool( final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final ExceptionHandler<? super T> exceptionHandler, final WorkHandler<? super T>... workHandlers) { this.ringBuffer = ringBuffer; final int numWorkers = workHandlers.length; workProcessors = new WorkProcessor[numWorkers]; //每個handler構造一個Processor for (int i = 0; i < numWorkers; i++) { workProcessors[i] = new WorkProcessor<>( ringBuffer, sequenceBarrier, //共用同一個sequenceBarrier workHandlers[i], exceptionHandler, workSequence); //共用同一個workSequence } } //... } public class Disruptor<T> { //... //為每個WorkHandler構造一個WorkProcessor,再包裝成一個WorkerPool public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers) { return createWorkerPool(new Sequence[0], workHandlers); } //.... }
針對問題三、四:消費速度不能超過生產者,如何限制?當沒有可處理數據的時候消費者該做什么,自旋還是掛起等待生產者喚醒?
使用SequenceBarrier,從WorkProcessor的代碼中我們可以知道,消費者會緩存上次獲取的最大可消費序號,然后在這序號范圍內都可以直接競爭。每次獲取最小可用序號的時候,則會觸發waitStrategy等待策略進行等待。

final class ProcessingSequenceBarrier implements SequenceBarrier { private final WaitStrategy waitStrategy; //等待策略 private final Sequence dependentSequence; //依賴的序號,默認為RingBuffer的sequence private volatile boolean alerted = false; private final Sequence cursorSequence; //RingBuffer的sequence private final Sequencer sequencer; //... public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException { checkAlert(); //如果已shutdown,則拋出異常,終結任務 //sequence為消費者想要的下一個序號 //cursorSequence為RingBuffer的序號(生產者最新序號) //dependentSequence默認就是cursorSequence //特殊情況下,例如消費者B要求只能消費消費者A消費過的,則dependentSequence就會是消費者A的sequence long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } //得到的序號是生產者用過的序號,但是該序號對應的數據可能未發布,如果訪問未發布的數據,就會影響正確性,因為可能該數據還處於translate階段 return sequencer.getHighestPublishedSequence(sequence, availableSequence); } //... }
其中等待策略有很多中,常見的就是BlockingWaitStategy,該等待策略會掛起執行線程。當生產者publishEvent的時候,則會調用WaitStrategy#signalAllWhenBlocking()方法喚醒所有等待線程。
public final class BlockingWaitStrategy implements WaitStrategy { private final Object mutex = new Object(); //使用對象內置的條件隊列 @Override public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; if (cursorSequence.get() < sequence) //當生產者序號小於消費者需要的序號時,掛起等待喚醒 { synchronized (mutex) { while (cursorSequence.get() < sequence) //使用while是為了防止被錯誤喚醒,所以被喚醒后還會再判斷條件是否滿足 { barrier.checkAlert(); mutex.wait(); } } } //生產者序號滿足后,查看依賴項是否滿足 //如果依賴的消費者的序號小於需求序號,即依賴的消費者還沒消費過需求序號 //則自旋等待 while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); ThreadHints.onSpinWait(); } return availableSequence; } @Override public void signalAllWhenBlocking() //開設接口,用於喚醒條件隊列內的等待線程 { synchronized (mutex) { mutex.notifyAll(); } } @Override public String toString() { return "BlockingWaitStrategy{" + "mutex=" + mutex + '}'; } }
針對問題六、七:RingBuffer構造的時候需要傳入線程工廠,RingBuffer是如何使用線程的,多個任務使用一個線程調度?消費者何時啟動?
消費者隨Disruptor啟動,Disruptor啟動時會從ConsumerRepository中取出Consumer,提交給Executor執行。
public RingBuffer<T> start() { checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); } return ringBuffer; }
其中,在新版的Disruptor中,不建議使用外部傳入的Executor,而是只傳ThreadFactory,然后由內部構造一個Executor,就是BasicExecutor。它的實現就是每次提交的任務都創建一個新的線程負責。所以它的線程模型就是一個消費者一個線程。
public class Disruptor<T> { //... public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory) { this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory)); } //... } public class BasicExecutor implements Executor { private final ThreadFactory factory; private final Queue<Thread> threads = new ConcurrentLinkedQueue<>(); public BasicExecutor(ThreadFactory factory) { this.factory = factory; } @Override public void execute(Runnable command) { //每提交一個任務就新建一個新的線程處理這個任務 final Thread thread = factory.newThread(command); if (null == thread) { throw new RuntimeException("Failed to create thread to run: " + command); } thread.start(); threads.add(thread); } //... }
