storm操作zookeeper的主要函數都定義在命名空間backtype.storm.cluster中(即cluster.clj文件中)。 backtype.storm.cluster定義了兩個重要protocol:ClusterState和StormClusterState。clojure中的protocol可以看成java中的接口,封裝了一組方法。ClusterState協議中封裝了一組與zookeeper進行交互的基礎函數,如獲取子節點函數,獲取子節點數據函數等,ClusterState協議定義如下:
關於storm操作zookeeper的詳細分析請參見博客:storm操作zookeeper源碼分析-cluster.clj
Zookeeper的操作
(
defprotocol
ClusterState
( set-ephemeral-node [ this path data ])
( delete-node [ this path ])
( create-sequential [ this path data ])
;; if node does not exist, create persistent with this data
( set-data [ this path data ])
( get-data [ this path watch? ])
( get-version [ this path watch? ])
( get-data-with-version [ this path watch? ])
( get-children [ this path watch? ])
( mkdirs [ this path ])
( close [ this ])
( register [ this callback ])
( unregister [ this id ]))
( set-ephemeral-node [ this path data ])
( delete-node [ this path ])
( create-sequential [ this path data ])
;; if node does not exist, create persistent with this data
( set-data [ this path data ])
( get-data [ this path watch? ])
( get-version [ this path watch? ])
( get-data-with-version [ this path watch? ])
( get-children [ this path watch? ])
( mkdirs [ this path ])
( close [ this ])
( register [ this callback ])
( unregister [ this id ]))
Storm使用Zookeeper的操作
(
defprotocol
StormClusterState
( assignments [ this callback ])
( assignment-info [ this storm-id callback ])
( assignment-info-with-version [ this storm-id callback ])
( assignment-version [ this storm-id callback ])
( active-storms [ this ])
( storm-base [ this storm-id callback ])
( get-worker-heartbeat [ this storm-id node port ])
( executor-beats [ this storm-id executor->node+port ])
( supervisors [ this callback ])
( supervisor-info [ this supervisor-id ]) ;; returns nil if doesn't exist
( setup-heartbeats! [ this storm-id ])
( teardown-heartbeats! [ this storm-id ])
( teardown-topology-errors! [ this storm-id ])
( heartbeat-storms [ this ])
( error-topologies [ this ])
( worker-heartbeat! [ this storm-id node port info ])
( remove-worker-heartbeat! [ this storm-id node port ])
( supervisor-heartbeat! [ this supervisor-id info ])
( activate-storm! [ this storm-id storm-base ])
( update-storm! [ this storm-id new-elems ])
( remove-storm-base! [ this storm-id ])
( set-assignment! [ this storm-id info ])
( remove-storm! [ this storm-id ])
( report-error [ this storm-id task-id node port error ])
( errors [ this storm-id task-id ])
( disconnect [ this ]))
( assignments [ this callback ])
( assignment-info [ this storm-id callback ])
( assignment-info-with-version [ this storm-id callback ])
( assignment-version [ this storm-id callback ])
( active-storms [ this ])
( storm-base [ this storm-id callback ])
( get-worker-heartbeat [ this storm-id node port ])
( executor-beats [ this storm-id executor->node+port ])
( supervisors [ this callback ])
( supervisor-info [ this supervisor-id ]) ;; returns nil if doesn't exist
( setup-heartbeats! [ this storm-id ])
( teardown-heartbeats! [ this storm-id ])
( teardown-topology-errors! [ this storm-id ])
( heartbeat-storms [ this ])
( error-topologies [ this ])
( worker-heartbeat! [ this storm-id node port info ])
( remove-worker-heartbeat! [ this storm-id node port ])
( supervisor-heartbeat! [ this supervisor-id info ])
( activate-storm! [ this storm-id storm-base ])
( update-storm! [ this storm-id new-elems ])
( remove-storm-base! [ this storm-id ])
( set-assignment! [ this storm-id info ])
( remove-storm! [ this storm-id ])
( report-error [ this storm-id task-id node port error ])
( errors [ this storm-id task-id ])
( disconnect [ this ]))
Storm中在Zookeeper中存儲的目錄
(
def
ASSIGNMENTS-ROOT
"assignments")
( def CODE-ROOT "code")
( def STORMS-ROOT "storms")
( def SUPERVISORS-ROOT "supervisors")
( def WORKERBEATS-ROOT "workerbeats")
( def ERRORS-ROOT "errors")
( def ASSIGNMENTS-SUBTREE ( str "/" ASSIGNMENTS-ROOT))
( def STORMS-SUBTREE ( str "/" STORMS-ROOT))
( def SUPERVISORS-SUBTREE ( str "/" SUPERVISORS-ROOT))
( def WORKERBEATS-SUBTREE ( str "/" WORKERBEATS-ROOT))
( def ERRORS-SUBTREE ( str "/" ERRORS-ROOT))
( def CODE-ROOT "code")
( def STORMS-ROOT "storms")
( def SUPERVISORS-ROOT "supervisors")
( def WORKERBEATS-ROOT "workerbeats")
( def ERRORS-ROOT "errors")
( def ASSIGNMENTS-SUBTREE ( str "/" ASSIGNMENTS-ROOT))
( def STORMS-SUBTREE ( str "/" STORMS-ROOT))
( def SUPERVISORS-SUBTREE ( str "/" SUPERVISORS-ROOT))
( def WORKERBEATS-SUBTREE ( str "/" WORKERBEATS-ROOT))
( def ERRORS-SUBTREE ( str "/" ERRORS-ROOT))
從上面來看,在Zookeeper中主要是有如下的五個子目錄:
- /assignments -> 任務分配信息
- /storms -> 正在運行的topology的ID
- /supervisors -> 所有的Supervisors的心跳信息
- /workerbeats -> 所有的Worker的心跳
- /errors -> 產生的出錯信息
結構圖
/-
{
storm-zk-root
}
--
storm
在
zookeeper
上的根目錄(
默認為
/storm)
|
| -/assignments -- topology 的任務分配信息
| |
| | -/ { topology-id } -- 這個下面保存的是每個 topology 的 assignments 信息包括: 對應的
| nimbus 上的代碼目錄, 所有 task 的啟動時間, 每個 task 與機器、端口的映射
| 操作為( assignments) 來獲取所有的 assignments 的值;
| 以及( assignment-info storm-id) 來得到給定的 storm-id 對應的 AssignmentInfo 信息
| 在 AssignmentInfo 中存儲的內容有:
| :executor->node+port :executor->start-time-secs :node->host
| 具體定義在 common.clj 中的
| ( defrecord Assignment [ master-code-dir node->host executor->node+port executor->start-time-secs ])
|
| -/storms -- 這個目錄保存所有正在運行的topology的id
| |
| |
| | -/ { topology-id } -- 這個文件保存這個 topology 的一些信息,包括 topology 的
| 名字, topology 開始運行的時間以及這個 topology 的狀態
| 操作( active-storms), 獲得當前路徑下活躍的 topology 數據。保存的內容參考類 StormBase
| ( storm-base storm-id) 得到給定的 storm-id 下的 StormBase 數據, 具體定義在 common.clj 中的
| ( defrecord StormBase [ storm-name launch-time-secs status num-workers component->executors ])
|
| -/supervisors -- 這個目錄保存所有的 supervisor的心跳信息
| |
| |
| | -/ { supervisor-id } -- 這個文件保存的是 supervisor 的心跳信息包括:心跳時間,主
| 機名,這個 supervisor 上 worker 的端口號運行時間( 具體看 SupervisorInfo 類)
| 操作( supervisors) 得到所有的 supervisors 節點
| ( supervisor-info supervisor-id) 得到給定的 supervisor-id 對應的 SupervisorInfo 信息
| 具體定義在 common.clj 中的
| ( defrecord SupervisorInfo [ time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs ])
|
| -/workerbeats -- 所有 worker 的心跳
| |
| | -/ { topology-id } -- 這個目錄保存這個 topology 的所有的 worker 的心跳信息
| |
| | -/ { supervisorId-port } -- worker 的心跳信息,包括心跳的時
| 間, worker 運行時間以及一些統計信息
| 操作( heartbeat-storms) 得到所有有心跳數據的 topology ,
| ( get-worker-heartbeat storm-id node port) 得到具體一個 topology 下的某個 worker( node :port) 的心跳狀況,
| ( executor-beats storm-id executor->node+port) 得到一個 executor 的心跳狀況
|
| -/errors -- 所有產生的 error 信息
|
| -/ { topology-id } -- 這個目錄保存這個 topology 下面的錯誤信息
| 操作( error-topologies) 得到出錯的 topology
| ( errors storm-id component-id) 得到給定的 storm-id component-id 下的出錯信息
| -/ { component-id }
|
| -/assignments -- topology 的任務分配信息
| |
| | -/ { topology-id } -- 這個下面保存的是每個 topology 的 assignments 信息包括: 對應的
| nimbus 上的代碼目錄, 所有 task 的啟動時間, 每個 task 與機器、端口的映射
| 操作為( assignments) 來獲取所有的 assignments 的值;
| 以及( assignment-info storm-id) 來得到給定的 storm-id 對應的 AssignmentInfo 信息
| 在 AssignmentInfo 中存儲的內容有:
| :executor->node+port :executor->start-time-secs :node->host
| 具體定義在 common.clj 中的
| ( defrecord Assignment [ master-code-dir node->host executor->node+port executor->start-time-secs ])
|
| -/storms -- 這個目錄保存所有正在運行的topology的id
| |
| |
| | -/ { topology-id } -- 這個文件保存這個 topology 的一些信息,包括 topology 的
| 名字, topology 開始運行的時間以及這個 topology 的狀態
| 操作( active-storms), 獲得當前路徑下活躍的 topology 數據。保存的內容參考類 StormBase
| ( storm-base storm-id) 得到給定的 storm-id 下的 StormBase 數據, 具體定義在 common.clj 中的
| ( defrecord StormBase [ storm-name launch-time-secs status num-workers component->executors ])
|
| -/supervisors -- 這個目錄保存所有的 supervisor的心跳信息
| |
| |
| | -/ { supervisor-id } -- 這個文件保存的是 supervisor 的心跳信息包括:心跳時間,主
| 機名,這個 supervisor 上 worker 的端口號運行時間( 具體看 SupervisorInfo 類)
| 操作( supervisors) 得到所有的 supervisors 節點
| ( supervisor-info supervisor-id) 得到給定的 supervisor-id 對應的 SupervisorInfo 信息
| 具體定義在 common.clj 中的
| ( defrecord SupervisorInfo [ time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs ])
|
| -/workerbeats -- 所有 worker 的心跳
| |
| | -/ { topology-id } -- 這個目錄保存這個 topology 的所有的 worker 的心跳信息
| |
| | -/ { supervisorId-port } -- worker 的心跳信息,包括心跳的時
| 間, worker 運行時間以及一些統計信息
| 操作( heartbeat-storms) 得到所有有心跳數據的 topology ,
| ( get-worker-heartbeat storm-id node port) 得到具體一個 topology 下的某個 worker( node :port) 的心跳狀況,
| ( executor-beats storm-id executor->node+port) 得到一個 executor 的心跳狀況
|
| -/errors -- 所有產生的 error 信息
|
| -/ { topology-id } -- 這個目錄保存這個 topology 下面的錯誤信息
| 操作( error-topologies) 得到出錯的 topology
| ( errors storm-id component-id) 得到給定的 storm-id component-id 下的出錯信息
| -/ { component-id }