Storm-源碼分析-Topology Submit-Executor-mk-threads


 

對於executor thread是整個storm最為核心的代碼, 因為在這個thread里面真正完成了大部分工作, 而其他的如supervisor,worker都是封裝調用.

對於executor的mk-threads, 是通過mutilmethods對spout和bolt分別定義不同的邏輯

1. Spout Thread

(defmethod mk-threads :spout [executor-data task-datas]
  (let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data
        ;;1.1 定義pending
        ^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy storm-conf)
        max-spout-pending (executor-max-spout-pending storm-conf (count task-datas))
        ^Integer max-spout-pending (if max-spout-pending (int max-spout-pending))        
        last-active (atom false)        
        spouts (ArrayList. (map :object (vals task-datas)))
        rand (Random. (Utils/secureRandomLong))        
        pending (RotatingMap.
                 2 ;; microoptimize for performance of .size method
                 (reify RotatingMap$ExpiredCallback
                   (expire [this msg-id [task-id spout-id tuple-info start-time-ms]]
                     (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))] ;;start-time-ms是取樣賦值的,一般為null,只有有start-time-ms,才會產生time-delta
                       (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta)
                       ))))
 
        ;;1.2 定義tuple-action-fn 
        tuple-action-fn (fn [task-id ^TupleImpl tuple]
                          (let [stream-id (.getSourceStreamId tuple)]
                            (condp = stream-id
                              Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
                              Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
                              (let [id (.getValue tuple 0)      ;;tuple values, values[0]為id
                                    [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)];;從pending中刪除tuple,重要!
                                (when spout-id
                                  (when-not (= stored-task-id task-id)
                                    (throw-runtime "Fatal error, mismatched task ids: " task-id "" stored-task-id))
                                  (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
                                    (condp = stream-id
                                      ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)   ;;ack
                                                                         spout-id tuple-finished-info time-delta)
                                      ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)  ;;fail
                                                                           spout-id tuple-finished-info time-delta)
                                      )))
                                ;; TODO: on failure, emit tuple to failure stream
                                ))))
        receive-queue (:receive-queue executor-data)    ;;取得receive disruptor queue
        event-handler (mk-task-receiver executor-data tuple-action-fn) ;;定義disruptor/clojure-handler, 使用tuple-action-fn處理從receive-queue里面得到的tuple
        has-ackers? (has-ackers? storm-conf)
        emitted-count (MutableLong. 0)
        empty-emit-streak (MutableLong. 0)
        
        ;; the overflow buffer is used to ensure that spouts never block when emitting
        ;; this ensures that the spout can always clear the incoming buffer (acks and fails), which
        ;; prevents deadlock from occuring across the topology (e.g. Spout -> Bolt -> Acker -> Spout, and all
        ;; buffers filled up)
        ;; when the overflow buffer is full, spouts stop calling nextTuple until it's able to clear the overflow buffer
        ;; this limits the size of the overflow buffer to however many tuples a spout emits in one call of nextTuple, 
        ;; preventing memory issues
        overflow-buffer (LinkedList.)]
     
    ;; 1.3 async-loop thread
    [(async-loop
      (fn []
        ;; If topology was started in inactive state, don't call (.open spout) until it's activated first.
        (while (not @(:storm-active-atom executor-data))
          (Thread/sleep 100))
        
        (log-message "Opening spout " component-id ":" (keys task-datas))
        (doseq [[task-id task-data] task-datas
                :let [^ISpout spout-obj (:object task-data)
                      tasks-fn (:tasks-fn task-data)
                      ;; 1.3.1 send-spout-msg 
                      send-spout-msg (fn [out-stream-id values message-id out-task-id]
                                       (.increment emitted-count)
                                       (let [out-tasks (if out-task-id
                                                         (tasks-fn out-task-id out-stream-id values)  ;;direct grouping
                                                         (tasks-fn out-stream-id values))   ;;調用grouper產生target tasks
                                             rooted? (and message-id has-ackers?)  ;;指定messageid並且有acker, 說明需要track該message, root?意思需要track的DAG的root
                                             root-id (if rooted? (MessageId/generateId rand)) ;;rand.nextLong, 隨機long, 產生root-id
                                             out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))] ;;對於發送到的每個task, 產生一個out-id(out-edgeid)
                                         (fast-list-iter [out-task out-tasks id out-ids]
                                                         (let [tuple-id (if rooted?
                                                                          (MessageId/makeRootId root-id id);;返回包含hashmap{root-id, out-id}的MessageId對象
                                                                          (MessageId/makeUnanchored))  ;;返回包含hashmap{}的MessageId對象 
                                                               out-tuple (TupleImpl. worker-context   ;;生成tuple對象
                                                                                     values
                                                                                     task-id
                                                                                     out-stream-id
                                                                                     tuple-id)]
                                                           (transfer-fn out-task      ;;調用executor->transfer-fn將tuple發送到spout的發送queue
                                                                        out-tuple
                                                                        overflow-buffer)))
                                         (if rooted?
                                           (do   ;;如果需要跟蹤
(.put pending root-id [task-id  ;;往pending queue增加需要track的tuple信息
                                                                    message-id
                                                                    {:stream out-stream-id :values values}
                                                                    (if (sampler) (System/currentTimeMillis))]) ;;只有sampler為true, 才會設置starttime,后面才會更新metrics和stats
                                             (task/send-unanchored task-data  ;;往ACKER-INIT-STREAM發送message, 告訴acker track該message 
                                                                   ACKER-INIT-STREAM-ID
                                                                   [root-id (bit-xor-vals out-ids) task-id]
                                                                   overflow-buffer))
                                           (when message-id  ;;rooted?為false, 而有message-id, 意味着沒有acker(has-ackers?為false)
                                             (ack-spout-msg executor-data task-data message-id  ;;既然沒有acker, 就直接ack
                                                            {:stream out-stream-id :values values}
                                                            (if (sampler) 0))))
                                         (or out-tasks []) ;;send-spout-msg返回值, 發送的task lists或空[]
                                         ))]]
          (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data)) ;;注冊builtin-metrics
          ;; 1.3.2 spout.open
          (.open spout-obj
                 storm-conf
                 (:user-context task-data)
                 (SpoutOutputCollector.
                  (reify ISpoutOutputCollector ;;實現ISpoutOutputCollector
                    (^List emit [this ^String stream-id ^List tuple ^Object message-id] ;;實現emit
                      (send-spout-msg stream-id tuple message-id nil)
                      )
                    (^void emitDirect [this ^int out-task-id ^String stream-id
                                       ^List tuple ^Object message-id]
                      (send-spout-msg stream-id tuple message-id out-task-id)
                      )
                    (reportError [this error]
                      (report-error error)
                      )))))
        (reset! open-or-prepare-was-called? true) 
        (log-message "Opened spout " component-id ":" (keys task-datas))
        ;; 1.3.3 setup-metrics! 
        (setup-metrics! executor-data) ;;使用schedule-recurring定期給自己發送METRICS_TICK tuple
        
        (disruptor/consumer-started! (:receive-queue executor-data)) ;;設置queue上面的consumerStartedFlag表示consumer已經啟動
        ;;1.3.4 fn
        (fn []
          ;; This design requires that spouts be non-blocking
          (disruptor/consume-batch receive-queue event-handler) ;;從recieve-queue取出batch tuples, 並使用tuple-action-fn處理
          
          ;; try to clear the overflow-buffer, 將overflow-buffer里面的數據放到發送的緩存queue里面
          (try-cause
            (while (not (.isEmpty overflow-buffer))
              (let [[out-task out-tuple] (.peek overflow-buffer)]
                (transfer-fn out-task out-tuple false nil)
                (.removeFirst overflow-buffer)))
          (catch InsufficientCapacityException e
            ))
          
          (let [active? @(:storm-active-atom executor-data)
                curr-count (.get emitted-count)]
            (if (and (.isEmpty overflow-buffer)  ;;只有當overflow-buffer為空, 並且pending沒有達到上限的時候, spout可以繼續emit tuple
                     (or (not max-spout-pending)
                         (< (.size pending) max-spout-pending)))
              (if active?  ;;storm集群是否active
                (do  ;;storm active
                  (when-not @last-active  ;;如果當前spout出於unactive狀態
                    (reset! last-active true)
                    (log-message "Activating spout " component-id ":" (keys task-datas))
                    (fast-list-iter [^ISpout spout spouts] (.activate spout))) ;;先active spout
               
                  (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))) ;;調用nextTuple,產生新的tuple
                (do ;;storm unactive
                  (when @last-active ;;如果spout出於active狀態
                    (reset! last-active false)
                    (log-message "Deactivating spout " component-id ":" (keys task-datas))
                    (fast-list-iter [^ISpout spout spouts] (.deactivate spout))) ;;deactive spout並休眠
                  ;; TODO: log that it's getting throttled
                  (Time/sleep 100))))
            (if (and (= curr-count (.get emitted-count)) active?) ;;沒有能夠emit新的tuple(前后emitted-count沒有變化)
              (do (.increment empty-emit-streak)
                  (.emptyEmit spout-wait-strategy (.get empty-emit-streak))) ;;調用spout-wait-strategy進行sleep
              (.set empty-emit-streak 0)
              ))           
          0)) ;;返回0, 表示async-loop的sleep時間為0
      :kill-fn (:report-error-and-die executor-data)
      :factory? true
      :thread-name component-id)]))

1.1 定義pending

spout在emit tuple后, 會等待ack或fail, 所以這些tuple暫時不能直接從刪掉, 只能先放入pending隊列, 直到最終被ack或fail后, 才能被刪除

首先, tuple pending的個數是有限制的, p*num-tasks
p是TOPOLOGY-MAX-SPOUT-PENDING, num-tasks是spout的task數

max-spout-pending (executor-max-spout-pending storm-conf (count task-datas))
(defn executor-max-spout-pending [storm-conf num-tasks]
  (let [p (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)]
    (if p (* p num-tasks))))

然后, spouts需要兩種情況下需要wait, nextTuple為空, 或達到maxSpoutPending上限

/**
 * The strategy a spout needs to use when its waiting. Waiting is
 * triggered in one of two conditions:
 * 
 * 1. nextTuple emits no tuples
 * 2. The spout has hit maxSpoutPending and can't emit any more tuples
 * 
 * The default strategy sleeps for one millisecond.
 */
public interface ISpoutWaitStrategy {
    void prepare(Map conf);
    void emptyEmit(long streak);
}

默認的wait策略是, sleep1毫秒, 可以在TOPOLOGY-SPOUT-WAIT-STRATEGY上配置特有的wait strategy class

^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy storm-conf)

最后, 定義pending的結構, 並且pending是會設置超時的, 不然萬一后面的blot發生問題, 會導致spout block

pending (RotatingMap.
         2 ;; microoptimize for performance of .size method, buckets數為2
         (reify RotatingMap$ExpiredCallback
           (expire [this msg-id [task-id spout-id tuple-info start-time-ms]]
             (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
               (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta)
               ))))

RotatingMap (backtype.storm.utils), 是無cleaner線程版的TimeCacheMap(Storm starter - SingleJoinExample)

其他的基本一致, 主要數據結構為, LinkedList<HashMap<K, V>> _buckets;

最主要的操作是rotate, 刪除舊bucket, 添加新bucket

    public Map<K, V> rotate() {
        Map<K, V> dead = _buckets.removeLast();
        _buckets.addFirst(new HashMap<K, V>());
        if(_callback!=null) {
            for(Entry<K, V> entry: dead.entrySet()) {
                _callback.expire(entry.getKey(), entry.getValue());
            }
        }
        return dead;
    }

但RotatingMap需要外部的計數器來觸發rotate, storm是通過SYSTEM_TICK來觸發, 下面會看到

1.2 定義tuple-action-fn

tuple-action-fn, 處理不同stream的tuple

1.2.1 SYSTEM_TICK_STREAM_ID

(.rotate pending) rotate pending列表

1.2.2 METRICS_TICK_STREAM_ID

執行(metrics-tick executor-data task-datas tuple)

觸發component發送builtin-metrics的data, 到METRICS_STREAM, 最終發送到metric-bolt統計當前的component處理tuples的情況

具體邏輯, 就是創建task-info和data-points, 並send到METRICS_STREAM

(defn metrics-tick [executor-data task-datas ^TupleImpl tuple]
  (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
        interval (.getInteger tuple 0)] ;;metrics tick tuple的values[0]表示interval
    (doseq [[task-id task-data] task-datas
            :let [name->imetric (-> interval->task->metric-registry (get interval) (get task-id)) ;;topology context的_registeredMetrics實際指向interval->task->metric-registry 
                  task-info (IMetricsConsumer$TaskInfo.
                             (. (java.net.InetAddress/getLocalHost) getCanonicalHostName)
                             (.getThisWorkerPort worker-context)
                             (:component-id executor-data)
                             task-id
                             (long (/ (System/currentTimeMillis) 1000))
                             interval)
                  data-points (->> name->imetric
                                   (map (fn [[name imetric]]
                                          (let [value (.getValueAndReset ^IMetric imetric)]
                                            (if value
                                              (IMetricsConsumer$DataPoint. name value)))))
                                   (filter identity)
                                   (into []))]]
      (if (seq data-points)
        (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))) ;;將[task-info data-points]發送到METRICS_STREAM

1.2.3 default, 普通tuple

對於spout而言, 作為topology的source, 收到的tuple只會是ACKER-ACK-STREAM或ACKER-FAIL-STREAM
所以收到tuple, 取得msgid, 從pending列表中刪除
最終根據steamid, 調用ack-spout-msg或fail-spout-msg

(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta]
  (let [storm-conf (:storm-conf executor-data)
        ^ISpout spout (:object task-data)
        task-id (:task-id task-data)]
    (when (= true (storm-conf TOPOLOGY-DEBUG))
      (log-message "Acking message " msg-id))
    (.ack spout msg-id) ;;ack
    (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) ;;執行ack hook
    (when time-delta      ;;滿足sample條件, 更新builtin-metrics和stats
      (builtin-metrics/spout-acked-tuple! (:builtin-metrics task-data) (:stats executor-data) (:stream tuple-info) time-delta)
      (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))

以ack-spout-msg為例, fail基本一樣, 只是調用.fail而已

1.3 async-loop thread

這是executor的主線程, 沒有使用disruptor.consume-loop來實現, 是因為這里不僅僅包含對recieve tuple的處理
所以使用async-loop來直接實現
前面也了解過, async-loop的實現是新開線程執行afn, 返回為sleeptime, 然后sleep sleeptime后繼續執行afn……
這里的實現比較奇特,
在afn中只是做了准備工作, 比如定義send-spout-msg, 初始化spout…
然后afn, 返回一個fn, 真正重要的工作在這個fn里面執行了, 因為sleeptime在作為函數參數的時候, 也一定會先被evaluate
比較奇葩, 為什么要這樣...

1.3.1 send-spout-msg

首先生成send-spout-msg函數, 這個函數最終被emit, emitDirect調用, 用於發送spout msg
所以邏輯就是首先根據message-id判斷是否需要track, 需要則利用MessageId生成root-id和out-id
然后生成tuple對象(TupleImpl)
先看看MessageId和TupleImpl的定義

這里的MessageId和emit傳入的message-id沒有什么關系, 這個名字起的容易混淆
這里主要的操作就是通過generateId產生隨機id, 然后通過makeRootId, 將[root-id, out-id]加入Map, anchorsToIds

package backtype.storm.tuple;
public class MessageId {
    private Map<Long, Long> _anchorsToIds;
    
    public static long generateId(Random rand) {
        return rand.nextLong();
    }

    public static MessageId makeUnanchored() {
        return makeId(new HashMap<Long, Long>());
    }
        
    public static MessageId makeId(Map<Long, Long> anchorsToIds) {
        return new MessageId(anchorsToIds);
    }
        
    public static MessageId makeRootId(long id, long val) {
        Map<Long, Long> anchorsToIds = new HashMap<Long, Long>();
        anchorsToIds.put(id, val);
        return new MessageId(anchorsToIds);
    }
public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, IMeta, Tuple {
    private List<Object> values;
    private int taskId;
    private String streamId;
    private GeneralTopologyContext context;
    private MessageId id;
    private IPersistentMap _meta = null;

    Long _processSampleStartTime = null;
    Long _executeSampleStartTime = null;
}

后面做的事, 使用transfer-fn將tuple發到發送queue, 然后在pending中增加item用於tracking, 並send message到acker通知它track這個message

1.3.2 spout.open, 初始化spout

很簡單, 關鍵是實現ISpoutOutputCollector, emit, emitDirect

1.3.3 setup-metrics!, METRICS_TICK的來源

使用schedule-recurring定期給自己發送METRICS_TICK tuple, 以觸發builtin-metrics的定期發送

1.3.4 fn

里面做了spout thread最關鍵的幾件事, 最終返回0, 表示async-loop的sleep時間
handle recieve-queue里面的tuple
調用nextTuple…
注意所有事情都是在一個線程里面順序做的, 所以不能有block的邏輯

 

2. Bolt Thread

(defmethod mk-threads :bolt [executor-data task-datas]
  (let [execute-sampler (mk-stats-sampler (:storm-conf executor-data))
        executor-stats (:stats executor-data)
        {:keys [storm-conf component-id worker-context transfer-fn report-error sampler
                open-or-prepare-was-called?]} executor-data
        rand (Random. (Utils/secureRandomLong))
 
        ;;2.1 tuple-action-fn
        tuple-action-fn (fn [task-id ^TupleImpl tuple]
                          (let [stream-id (.getSourceStreamId tuple)]
                            (condp = stream-id
                              Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
                              (let [task-data (get task-datas task-id)
                                    ^IBolt bolt-obj (:object task-data)  ;;取出bolt對象
                                    user-context (:user-context task-data)
                                    sampler? (sampler)  
                                    execute-sampler? (execute-sampler)
                                    now (if (or sampler? execute-sampler?) (System/currentTimeMillis))] ;;滿足sample條件,記錄當前時間
                                (when sampler?
                                  (.setProcessSampleStartTime tuple now))
                                (when execute-sampler?
                                  (.setExecuteSampleStartTime tuple now))
                                (.execute bolt-obj tuple) ;;調用Bolt的execute方法
                                     (let [delta (tuple-execute-time-delta! tuple)] ;;只有上面生成了now, 這里delta才不為空
                                       (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta)) ;;執行boltExecute hook
                                  (when delta  ;;滿足sample條件, 則更新builtin-metrics和stats
                                    (builtin-metrics/bolt-execute-tuple! (:builtin-metrics task-data)
                                                                         executor-stats
                                                                         (.getSourceComponent tuple)                                                      
                                                                         (.getSourceStreamId tuple)
                                                                         delta)
                                    (stats/bolt-execute-tuple! executor-stats
                                                               (.getSourceComponent tuple)
                                                               (.getSourceStreamId tuple)
                                                               delta)))))))]
    
    ;; TODO: can get any SubscribedState objects out of the context now
    ;;2.2 async-loop
    [(async-loop
      (fn []
        ;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
        (while (not @(:storm-active-atom executor-data))          
          (Thread/sleep 100))
        
        (log-message "Preparing bolt " component-id ":" (keys task-datas))
        (doseq [[task-id task-data] task-datas
                :let [^IBolt bolt-obj (:object task-data)
                      tasks-fn (:tasks-fn task-data)
                      user-context (:user-context task-data)
                      ;;2.2.1 bolt-emit
                      bolt-emit (fn [stream anchors values task]
                                  (let [out-tasks (if task
                                                    (tasks-fn task stream values) ;;direct grouping
                                                    (tasks-fn stream values))]
                                    (fast-list-iter [t out-tasks] ;;每個target out-task
                                                    (let [anchors-to-ids (HashMap.)] ;;初始化,用於保存tuple上產生的edges和roots之間的關系
                                                      (fast-list-iter [^TupleImpl a anchors] ;;每個anchor(源tuple)
                                                                      (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)] ;;得到所有的root-ids,anchor可能來自多個源
                                                                        (when (pos? (count root-ids))
                                                                          (let [edge-id (MessageId/generateId rand)] ;;為每個anchor產生新的edge-id
                                                                            (.updateAckVal a edge-id) ;;和anchor tuple的_outAckVal做異或, 緩存新產生的edgeid
                                                                            (fast-list-iter [root-id root-ids]
                                                                                            (put-xor! anchors-to-ids root-id edge-id)) ;;生成新的anchors-to-ids, 保存新edge和所有root-id的關系到anchors-to-ids 
                                                                            ))))
                                                      (transfer-fn t
                                                                   (TupleImpl. worker-context
                                                                               values
                                                                               task-id
                                                                               stream
                                                                               (MessageId/makeId anchors-to-ids)))))
                                    (or out-tasks [])))]] ;;返回值, target task ids
          (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
 
           2.2.2 prepare
          (.prepare bolt-obj
                    storm-conf
                    user-context
                    (OutputCollector.
                     (reify IOutputCollector
                       (emit [this stream anchors values]
                         (bolt-emit stream anchors values nil))
                       (emitDirect [this task stream anchors values]
                         (bolt-emit stream anchors values task))
                       (^void ack [this ^Tuple tuple]
                         (let [^TupleImpl tuple tuple
                               ack-val (.getAckVal tuple)] ;;取出緩存的新edges
                              (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)] ;;對於anchors-to-ids中記錄的每個root進行ack
                                          (task/send-unanchored task-data
                                                                ACKER-ACK-STREAM-ID
                                                                [root (bit-xor id ack-val)])  ;;發送ack消息, ack和同步新edges
                                          ))
                         (let [delta (tuple-time-delta! tuple)]  ;;更新metrics和stats
                           (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
                           (when delta
                             (builtin-metrics/bolt-acked-tuple! (:builtin-metrics task-data)
                                                                executor-stats
                                                                (.getSourceComponent tuple)                                                      
                                                                (.getSourceStreamId tuple)
                                                                delta)
                             (stats/bolt-acked-tuple! executor-stats
                                                      (.getSourceComponent tuple)
                                                      (.getSourceStreamId tuple)
                                                      delta))))
                       (^void fail [this ^Tuple tuple]
                         (fast-list-iter [root (.. tuple getMessageId getAnchors)]
                                         (task/send-unanchored task-data
                                                               ACKER-FAIL-STREAM-ID
                                                               [root])) ;;對應fail比較簡單,任意一個edge失敗,都表示root失敗
                         (let [delta (tuple-time-delta! tuple)]
                           (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
                           (when delta
                             (builtin-metrics/bolt-failed-tuple! (:builtin-metrics task-data)
                                                                 executor-stats
                                                                 (.getSourceComponent tuple)                                                      
                                                                 (.getSourceStreamId tuple))
                             (stats/bolt-failed-tuple! executor-stats
                                                       (.getSourceComponent tuple)
                                                       (.getSourceStreamId tuple)
                                                       delta))))
                       (reportError [this error]
                         (report-error error)
                         )))))
        (reset! open-or-prepare-was-called? true)        
        (log-message "Prepared bolt " component-id ":" (keys task-datas))
        (setup-metrics! executor-data)  ;;創建metrics tick

        (let [receive-queue (:receive-queue executor-data)
              event-handler (mk-task-receiver executor-data tuple-action-fn)]  ;;用tuple-action-fn創建receive queue的event-handler
          (disruptor/consumer-started! receive-queue) ;;標識consumer開始運行
          (fn []            
            (disruptor/consume-batch-when-available receive-queue event-handler) ;;真正的consume receive-queue  
            0))) ;;sleep 0s
      :kill-fn (:report-error-and-die executor-data)
      :factory? true
      :thread-name component-id)]))

 

2.1 tuple-action-fn

先判斷tuple的stream-id, 對於METRICS_TICK的處理參考上面

否則, 就是普通的tuple, 用對應的task去處理
對於一個executor線程中包含多個task, 其實就是這里根據task-id選擇不同的task-data
並且最終調用bolt-obj的execute, 這就是user定義的bolt邏輯

^IBolt bolt-obj (:object task-data)

(.execute bolt-obj tuple)

 

2.2 async-loop, 啟動線程

2.2.1 bolt-emit

類似send-spout-msg, 被emit調用, 用於發送tuple, Storm的命名風格不統一
調用task-fn產生out-tasks, 以及調用transfer-fn, 將tuples發送到發送隊列都比較好理解

關鍵中一段對於anchors-to-ids的操作, 剛開始有些費解...這個anchors-to-ids 到底干嗎用的?

用於記錄的DAG圖中, 該tuple產生的edge, 以及和root的關系
代碼里面anchor表示的是源tuple, 而理解上anchor更象是一種關系, 所以有些confuse 
所以上面的邏輯就是新產生edge-id, 雖然相同的out-task, 但不同的anchor會產生不同的edge-id
然后對每個anchor的root-ids, 產生map [root-id, edge-id] (上面的邏輯是異或, 因為不同anchors可能有相同的root)
最終就是得到該tuple產生edges和所有相關的roots之間的關系

image

然后其中的(.updateAckVal a edge-id)是干嗎的?
為了節省一次向acker的消息發送, 理論上, 應該在創建edge的時候發送一次消息去acker上注冊一下, 然后在ack的時候再發送一次消息去acker完成ack
但是storm做了優化, 節省了在創建edge的這次消息發送
優化的做法是,
將新創建的edge-id, 緩存在父tuple的_outAckVal上, 因為處理完緊接着會去ack父tuple, 所以在這個時候將新創建的edge信息一起同步到acker,具體看下面的ack實現
所以這里調用updateAckVal去更新父tuple的_outAckVal(做異或), 而沒有向acker發送消息

關於storm跟蹤所有tuple的方法
傳統的方法, 在spout的時候, 生成rootid, 之后每次emit tuple, 產生一條edgeid, 就可以記錄下整個DAG
然后在ack的時候, 只需要標記或刪除這些edgeid, 表明已經處理完就ok.
這樣的問題在於, 如果DAG圖比較復雜, 那么這個結構會很大, 可擴展性不好
storm采用的方法是, 不需要記錄具體的每條edge, 因為實際上他並不關心有哪些edge, 他只關心每條edge是否都被ack了, 所以只需要不停的做異或, 成對的異或結果為0

 

2.2.1 prepare

主要在於OutputCollector的實現,

其中emit和emitDirect都是直接調用bolt-emit, 很簡單

重點就是ack和fail的實現

其中比較難理解的是, 發送ack消息是不是直接發送本身的edge-id, 而是(bit-xor id ack-val)
其實做了兩件事, ack當前tuple和同步新的edges
因為acker拿到id和ack-val也是和acker記錄的值做異或, 所以這里先直接做異或, 省得在消息中需要發送兩個參數

總結

如果有耐心看到這兒, 再附送兩幅圖...

image

 

image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM