Storm在zookeeper上的目錄結構


storm操作zookeeper的主要函數都定義在命名空間backtype.storm.cluster中(即cluster.clj文件中)。 backtype.storm.cluster定義了兩個重要protocol:ClusterState和StormClusterState。clojure中的protocol可以看成java中的接口,封裝了一組方法。ClusterState協議中封裝了一組與zookeeper進行交互的基礎函數,如獲取子節點函數,獲取子節點數據函數等,ClusterState協議定義如下:

關於storm操作zookeeper的詳細分析請參見博客:storm操作zookeeper源碼分析-cluster.clj

 
Zookeeper的操作 

 

( defprotocol ClusterState
 ( set-ephemeral-node [ this path data ])
 ( delete-node [ this path ])
 ( create-sequential [ this path data ])
  ;; if node does not exist, create persistent with this data
 ( set-data [ this path data ])
 ( get-data [ this path watch? ])
 ( get-version [ this path watch? ])
 ( get-data-with-version [ this path watch? ])
 ( get-children [ this path watch? ])
 ( mkdirs [ this path ])
 ( close [ this ])
 ( register [ this callback ])
 ( unregister [ this id ]))
 
Storm使用Zookeeper的操作 

 

( defprotocol StormClusterState
 ( assignments [ this callback ])
 ( assignment-info [ this storm-id callback ])
 ( assignment-info-with-version [ this storm-id callback ])
 ( assignment-version [ this storm-id callback ])
 ( active-storms [ this ])
 ( storm-base [ this storm-id callback ])
 ( get-worker-heartbeat [ this storm-id node port ])
 ( executor-beats [ this storm-id executor->node+port ])
 ( supervisors [ this callback ])
 ( supervisor-info [ this supervisor-id ]) ;; returns nil if doesn't exist
 ( setup-heartbeats! [ this storm-id ])
 ( teardown-heartbeats! [ this storm-id ])
 ( teardown-topology-errors! [ this storm-id ])
 ( heartbeat-storms [ this ])
 ( error-topologies [ this ])
 ( worker-heartbeat! [ this storm-id node port info ])
 ( remove-worker-heartbeat! [ this storm-id node port ])
 ( supervisor-heartbeat! [ this supervisor-id info ])
 ( activate-storm! [ this storm-id storm-base ])
 ( update-storm! [ this storm-id new-elems ])
 ( remove-storm-base! [ this storm-id ])
 ( set-assignment! [ this storm-id info ])
 ( remove-storm! [ this storm-id ])
 ( report-error [ this storm-id task-id node port error ])
 ( errors [ this storm-id task-id ])
 ( disconnect [ this ]))
 
Storm中在Zookeeper中存儲的目錄

 

( def ASSIGNMENTS-ROOT "assignments")
( def CODE-ROOT "code")
( def STORMS-ROOT "storms")
( def SUPERVISORS-ROOT "supervisors")
( def WORKERBEATS-ROOT "workerbeats")
( def ERRORS-ROOT "errors")

( def ASSIGNMENTS-SUBTREE ( str "/" ASSIGNMENTS-ROOT))
( def STORMS-SUBTREE ( str "/" STORMS-ROOT))
( def SUPERVISORS-SUBTREE ( str "/" SUPERVISORS-ROOT))
( def WORKERBEATS-SUBTREE ( str "/" WORKERBEATS-ROOT))
( def ERRORS-SUBTREE ( str "/" ERRORS-ROOT))

從上面來看,在Zookeeper中主要是有如下的五個子目錄:

  1. /assignments -> 任務分配信息
  2. /storms -> 正在運行的topology的ID
  3. /supervisors -> 所有的Supervisors的心跳信息
  4. /workerbeats -> 所有的Worker的心跳
  5. /errors -> 產生的出錯信息
 
結構圖

 

/- { storm-zk-root }           -- storm zookeeper 上的根目錄( 默認為 /storm)
 |
 | -/assignments             -- topology 的任務分配信息
 |   |
 |   | -/ { topology-id }       -- 這個下面保存的是每個 topology assignments 信息包括: 對應的
 |                             nimbus 上的代碼目錄, 所有 task 的啟動時間, 每個 task 與機器、端口的映射
 |                             操作為( assignments) 來獲取所有的 assignments 的值;
 |                             以及( assignment-info storm-id) 來得到給定的 storm-id 對應的 AssignmentInfo 信息
 |                             AssignmentInfo 中存儲的內容有:
 |                             :executor->node+port :executor->start-time-secs :node->host
 |                             具體定義在 common.clj 中的
 |                            ( defrecord Assignment [ master-code-dir node->host executor->node+port executor->start-time-secs ])
 |
 | -/storms                 -- 這個目錄保存所有正在運行的topology的id
 |   |                        
 |   |
 |   | -/ { topology-id }       -- 這個文件保存這個 topology 的一些信息,包括 topology
 |                             名字, topology 開始運行的時間以及這個 topology 的狀態
 |                             操作( active-storms), 獲得當前路徑下活躍的 topology 數據。保存的內容參考類 StormBase
 |                            ( storm-base storm-id) 得到給定的 storm-id 下的 StormBase 數據, 具體定義在 common.clj 中的
 |                            ( defrecord StormBase [ storm-name launch-time-secs status num-workers component->executors ])
 |
 | -/supervisors             -- 這個目錄保存所有的 supervisor的心跳信息
 |   |                        
 |   |
 |   | -/ { supervisor-id }     -- 這個文件保存的是 supervisor 的心跳信息包括:心跳時間,主
 |                             機名,這個 supervisor worker 的端口號運行時間( 具體看 SupervisorInfo )
 |                             操作( supervisors) 得到所有的 supervisors 節點
 |                            ( supervisor-info supervisor-id) 得到給定的 supervisor-id 對應的 SupervisorInfo 信息
 |                             具體定義在 common.clj 中的
 |                            ( defrecord SupervisorInfo [ time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs ])
 |
 | -/workerbeats                         -- 所有 worker 的心跳
 |   |
 |   | -/ { topology-id }                   -- 這個目錄保存這個 topology 的所有的 worker 的心跳信息
 |       |
 |       | -/ { supervisorId-port }         -- worker 的心跳信息,包括心跳的時
 |                                         間, worker 運行時間以及一些統計信息
 |                                         操作( heartbeat-storms) 得到所有有心跳數據的 topology
 |                                        ( get-worker-heartbeat storm-id node port) 得到具體一個 topology 下的某個 worker( node :port) 的心跳狀況,
 |                                        ( executor-beats storm-id executor->node+port) 得到一個 executor 的心跳狀況
 |
 | -/errors                       -- 所有產生的 error 信息
     |
     | -/ { topology-id }           -- 這個目錄保存這個 topology 下面的錯誤信息
         |                         操作( error-topologies) 得到出錯的 topology
         |                         ( errors storm-id component-id) 得到給定的 storm-id component-id 下的出錯信息
         | -/ { component-id }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM