Storm-源碼分析- Disruptor在storm中的使用


Disruptor 2.0, (http://ifeve.com/disruptor-2-change/)

Disruptor為了更便於使用, 在2.0做了比較大的調整, 比較突出的是更換了幾乎所有的概念名

老版本,

image

新版本,

image

從左到右的變化如下,

1. Producer –> Publisher
2. ProducerBarrier被integrate到RingBuffer里面, 叫做PublishPort, 提供publish接口
3. Entry –> Event
4, Cursor封裝成Sequence, 其實Sequence就是將cursor+pading封裝一下
5. Consumer –> EventProcesser
6. ConsumerBarrier 變為DependencyBarrier, 或SequenceBarrier

並且對於publisher和EventProcesser, 存在ClaimStrategy和WaitStrategy
對於publisher的ClaimStrategy, 由於publisher需要先claim到sequencer才能publish: SingleThreadedClaimStrategy, MultiThreadedClaimStrategy, 應該是對於singlethread不需要使用CAS更為高效
對於EventProcesser的WaitStrategy, 當取不到數據的時候采用什么樣的策略進行等待: BlockingWaitStrategy, BusySpinWaitStrategy, SleepingWaitStrategy, YieldingWaitStrategy
Blocking就是同步加鎖, BusySpin就是忙等耗CPU, 都比較低效
Yielding就是調用thread.yeild(), 把線程的從可執行狀態調整成就緒裝, 意思我先息下, 你們忙你們先來, 就是把CPU讓給其他的線程, 但是yeild並不保證過多久線程被執行, 如果沒有其他線程, 可能會被立即執行
而sleep, 會強制線程休眠指定時間, 然后再重新調度

 

DisruptorQueue.java

    static final Object FLUSH_CACHE = new Object(); //特殊對象, 當consumer取到時, 觸發cache queue的flush
    static final Object INTERRUPT = new Object(); //特殊對象, 當consumer取到時, 觸發InterruptedException
    
    RingBuffer<MutableObject> _buffer; //Disruptor的主要的數據結構RingBuffer
    Sequence _consumer; //consumer讀取序號
    SequenceBarrier _barrier; //用於consumer監聽RingBuffer的序號情況
    
    // TODO: consider having a threadlocal cache of this variable to speed up reads?
    volatile boolean consumerStartedFlag = false; //標志consumer是否start, 由於需要在change后其他線程可以馬上知道, 所以使用volatile
    ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue(); //當consumer沒有start的時候, cache event的queue

ConcurrentLinkedQueue, 使用CAS而非lock來實現的線程安全隊列, 具體參考(http://blog.sina.com.cn/s/blog_5efa3473010129pj.html)

首先聲明一組變量, 部分會在構造函數中被初始化
最重要的結構就是RingBuffer, 這是個模板類, 這里從ObjectEventFactory()的實現也可以看出來, 初始化的時候在ringbuffer的每個entry上都創建一個MutableObject對象

MutableObject的實現很簡單, 這是封裝了object o, 為什么要做這層封裝?
為了避免Java GC, 對於RingBuffer一旦初始化好, 上面的所有的MutableObject都不會被釋放, 你只是去對object o, set不同的值

_buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
public static class ObjectEventFactory implements EventFactory<MutableObject> {
    @Override
    public MutableObject newInstance() {
        return new MutableObject();
    }        
}
public class MutableObject {
    Object o = null;
}


Publish

Publish過程, 可見當前ProducerBarrier已經被集成到RingBuffer里面, 所以直接調用_buffer的接口
首先調用next, claim序號
取出序號上的MutableObject, 並將輸入obj set
最后, publish當前序號, 表示consumer可以讀取
當consumer沒有start時, 會將obj cache在_cache中, 而不會放到ringbuffer中 (我沒有想明白why? 為何要使用低效的鏈表queue來cache, 而不直接放到ringbuffer里面)

    public void publish(Object obj, boolean block) throws InsufficientCapacityException {
        if(consumerStartedFlag) {
            final long id;
            if(block) {
                id = _buffer.next();
            } else {
                id = _buffer.tryNext(1);
            }
            final MutableObject m = _buffer.get(id);
            m.setObject(obj);
            _buffer.publish(id);
        } else {
            _cache.add(obj);
            if(consumerStartedFlag) flushCache();
        }
    }

Consume

consume的過程, 這里實現的時Batch consume, 即給定Cursor, 會一直consume到該cursor為止

_consumer代表當前已經被consume的序號, 所以從_consumer.get() + 1開始讀
取出MutableObject中的o, 並將MutableObject 清空
根據o的情況, 3種情況,
    1. 如果是FLUSH_CACHE對象, 將cache中的event讀出調用handler.onEvent
    2. 如果是INTERRUPT對象, 觸發InterruptedException
    3. 正常情況, 直接調用handler.onEvent處理該o, curr == cursor判斷表示batch是否結束, 當讀到cursor的時候結束

最終將_consumer置為cursor, 表示已經讀到cursor位置

    private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
        for(long curr = _consumer.get() + 1; curr <= cursor; curr++) {
            try {
                MutableObject mo = _buffer.get(curr);
                Object o = mo.o;
                mo.setObject(null);
                if(o==FLUSH_CACHE) {
                    Object c = null;
                    while(true) {                        
                        c = _cache.poll();
                        if(c==null) break;
                        else handler.onEvent(c, curr, true);
                    }
                } else if(o==INTERRUPT) {
                    throw new InterruptedException("Disruptor processing interrupted");
                } else {
                    handler.onEvent(o, curr, curr == cursor);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        //TODO: only set this if the consumer cursor has changed?
        _consumer.set(cursor);
    }

backtype.storm.disruptor.clj

創建DisruptorQueue, 選用MultiThreadedClaimStrategy和BlockingWaitStrategy

(defnk disruptor-queue [buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
  (DisruptorQueue. ((CLAIM-STRATEGY claim-strategy) buffer-size)
                   (mk-wait-strategy wait-strategy)
                   ))

並封裝一系列Java接口
最重要的工作是, 啟動consume-loop
這里ret是closeover了一個間隔為0的不停執行(consume-batch-when-available queue handler) 的線程, 而consumeBatchWhenAvailable的實現就是不停的sleep並調用consumeBatchToCursor

並且通過consumer-started!通知其他線程consumer已經start

(defnk consume-loop* [^DisruptorQueue queue handler :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
                      :thread-name nil]
  (let [ret (async-loop
              (fn []
                (consume-batch-when-available queue handler)
                0 )
              :kill-fn kill-fn
              :thread-name thread-name
              )]
     (consumer-started! queue)
     ret
     ))

(defmacro consume-loop [queue & handler-args]
  `(let [handler# (handler ~@handler-args)]
     (consume-loop* ~queue handler#)
     ))


看看async-loop實現什么功能?
返回reify實現的record, 其中closeover了thread
這個thread主要就是死循環的執行傳入的afn, 並且以afn的返回值作為執行間隔

主要功能, 異步的loop, 開啟新的線程來執行loop, 而不是在當前主線程, 並且提供了sleep設置

;; afn returns amount of time to sleep
(defnk async-loop [afn
                   :daemon false
                   :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
                   :priority Thread/NORM_PRIORITY
                   :factory? false
                   :start true
                   :thread-name nil]
  (let [thread (Thread.
                (fn []
                  (try-cause
                    (let [afn (if factory? (afn) afn)]
                      (loop []
                        (let [sleep-time (afn)]
                          (when-not (nil? sleep-time)
                            (sleep-secs sleep-time)
                            (recur))
                          )))
                    (catch InterruptedException e
                      (log-message "Async loop interrupted!")
                      )
                    (catch Throwable t
                      (log-error t "Async loop died!")
                      (kill-fn t)
                      ))
                  ))]
    (.setDaemon thread daemon)
    (.setPriority thread priority)
    (when thread-name
      (.setName thread (str (.getName thread) "-" thread-name)))
    (when start
      (.start thread))
    ;; should return object that supports stop, interrupt, join, and waiting?
    (reify SmartThread
      (start [this]
        (.start thread))
      (join [this]
        (.join thread))
      (interrupt [this]
        (.interrupt thread))
      (sleeping? [this]
        (Time/isThreadWaiting thread)
        ))
      ))

 

Storm在Worker中executors線程間通信, 如何使用Disruptor的?

image

 

Understanding the Internal Message Buffers of Storm, 可以參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM