在backtype.storm.cluster.clj中, 定義了storm對於Zookeeper的使用
ClusterState
首先定義操作Zookeeper集群的interface
(defprotocol ClusterState (set-ephemeral-node [this path data]) (delete-node [this path]) (create-sequential [this path data]) (set-data [this path data]) ;; if node does not exist, create persistent with this data (get-data [this path watch?]) (get-children [this path watch?]) (mkdirs [this path]) (close [this]) (register [this callback]) (unregister [this id]) )
實現和生成用於操作Zookeeper集群的record
首先創建zk-client, 並在zk上創建STORM-ZOOKEEPER-ROOT目錄
接着定義,
callbacks, callback集合
active, 標志zk集群狀態
zk, zk client
創建zk client的時候, 設置了watcher, 即zk server當狀態發生變化時會給client發送event, 此處client設置的watcher會調用callbacks來處理server發送的event
Storm在操作Zookeeper時, 使用CuratorFramework(http://curator.incubator.apache.org/curator-framework/index.html)
最后實現ClusterState protocol, 其中register和unregister是用來添加/刪除callbacks的, 其他都是些zk的常規操作
(defn mk-distributed-cluster-state [conf] (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)] (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT)) (.close zk)) (let [callbacks (atom {}) active (atom true) zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf :root (conf STORM-ZOOKEEPER-ROOT) :watcher (fn [state type path] (when @active (when-not (= :connected state) (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper.")) (when-not (= :none type) (doseq [callback (vals @callbacks)] (callback type path)))) ))] (reify ClusterState (register [this callback] (let [id (uuid)] (swap! callbacks assoc id callback) id )) (unregister [this id] (swap! callbacks dissoc id)) (set-ephemeral-node [this path data] (zk/mkdirs zk (parent-path path)) (if (zk/exists zk path false) (try-cause (zk/set-data zk path data) ; should verify that it's ephemeral (catch KeeperException$NoNodeException e (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data") (zk/create-node zk path data :ephemeral) )) (zk/create-node zk path data :ephemeral) )) (create-sequential [this path data] (zk/create-node zk path data :sequential)) (set-data [this path data] ;; note: this does not turn off any existing watches (if (zk/exists zk path false) (zk/set-data zk path data) (do (zk/mkdirs zk (parent-path path)) (zk/create-node zk path data :persistent) ))) (delete-node [this path] (zk/delete-recursive zk path) ) (get-data [this path watch?] (zk/get-data zk path watch?) ) (get-children [this path watch?] (zk/get-children zk path watch?)) (mkdirs [this path] (zk/mkdirs zk path)) (close [this] (reset! active false) (.close zk)) )))
StormClusterState
定義針對Storm定制的zk操作協議, 包含各種storm里面的信息在zk上的讀寫
(defprotocol StormClusterState (assignments [this callback]) (assignment-info [this storm-id callback]) (active-storms [this]) (storm-base [this storm-id callback]) (get-worker-heartbeat [this storm-id node port]) (executor-beats [this storm-id executor->node+port]) (supervisors [this callback]) (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist (setup-heartbeats! [this storm-id]) (teardown-heartbeats! [this storm-id]) (teardown-topology-errors! [this storm-id]) (heartbeat-storms [this]) (error-topologies [this]) (worker-heartbeat! [this storm-id node port info]) (remove-worker-heartbeat! [this storm-id node port]) (supervisor-heartbeat! [this supervisor-id info]) (activate-storm! [this storm-id storm-base]) (update-storm! [this storm-id new-elems]) (remove-storm-base! [this storm-id]) (set-assignment! [this storm-id info]) (remove-storm! [this storm-id]) (report-error [this storm-id task-id error]) (errors [this storm-id task-id]) (disconnect [this]) )
首先判斷是否第一次mk-storm-cluster-state, 既是否進行過zk cluster state的創建, 如果沒有調用mk-distributed-cluster-state
接着, 定義一系列的callbacks, 並調用cluster-state的register, 注冊到callbacks列表中
state-id 就是register返回的callback的uuid
再者, 在zk上創建storm的子目錄
最后, 實現StormClusterState協議, 實現各種zk數據的讀寫
(defn mk-storm-cluster-state [cluster-state-spec] (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec) [false cluster-state-spec] [true (mk-distributed-cluster-state cluster-state-spec)]) assignment-info-callback (atom {}) supervisors-callback (atom nil) assignments-callback (atom nil) ;在StormClusterState.assignments中被set storm-base-callback (atom {}) state-id (register cluster-state (fn [type path] (let [[subtree & args] (tokenize-path path)] ;將path按'/'分割 (condp = subtree ;對path的subtree部分進行swith…case ASSIGNMENTS-ROOT (if (empty? args) (issue-callback! assignments-callback) ;issue-callback!, 執行並刪除該callback, 保證callback只被執行一次 (issue-map-callback! assignment-info-callback (first args))) SUPERVISORS-ROOT (issue-callback! supervisors-callback) STORMS-ROOT (issue-map-callback! storm-base-callback (first args)) ;; this should never happen (halt-process! 30 "Unknown callback for subtree " subtree args) ) )))] (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]] (mkdirs cluster-state p)) (reify StormClusterState )
例子
通過一個場景來說明storm怎樣使用zookeeper
supervisor中的mk-synchronize-supervisor, 主要用於下載新的, 並刪除不使用的topology代碼
所以這個邏輯光執行一次是不夠的, 需要當每次assignment發生變化的時候就執行一次
storm是利用zookeeper的watcher來解決這個問題
1. 在mk-distributed-cluster-state中創建zk client的時候配置watcher, 當收到zk server的event的時候, 調用callbacks列表里面的callback進行處理
而這個callback本身, 就是根據event中的path(代表哪部分數據發生change)來issue在storm-cluster-state中維護的一系列callback
比如, 當ASSIGNMENTS-ROOT發生變化時, 會調用assignments-callback
3. 那么也就是說只需要將mk-synchronize-supervisor, set到assignments-callback, 就可以保證當ASSIGNMENTS-ROOT發生變化時, 調用mk-synchronize-supervisor去同步topology代碼
什么時候set? 在第一次調用mk-synchronize-supervisor的時候
sync-callback (fn [& ignored] (.add event-manager this))
assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback)
同步topology代碼是消耗時間的事情, 所以實現的時候放在后台執行, 只是將this(function) add到event-manager的queue里面, 后台線程會執行這個函數
並且在調用assignment獲取assignments-snapshot的時候, 將sync-callback set到assignments-callback中去
(assignments [this callback]
(when callback
(reset! assignments-callback callback))
(get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
By the way, 對於get-children, 是否有callback, 即是否被watch, 讀的數據是不一樣的, 具體原因不是很清楚, 需要后面看看zk的具體使用
(defn get-children [^CuratorFramework zk ^String path watch?] (if watch? (.. zk (getChildren) (watched) (forPath (normalize-path path))) (.. zk (getChildren) (forPath (normalize-path path)))))
4. 前面說了issue-callback!在執行assignments-callback之前, 會將其清空, 所以如果需要不斷的觸發, 那么就要不斷的設置assignments-callback
所以作為callback, mk-synchronize-supervisor會先通過assignments-snapshot去重設assignments-callback
至於為什么要采用這樣的機制? 現在還看不清楚