storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中)。 backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState。clojure中的protocol可以看成java中的接口,封装了一组方法。ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协议定义如下:
关于storm操作zookeeper的详细分析请参见博客:storm操作zookeeper源码分析-cluster.clj
Zookeeper的操作
(
defprotocol
ClusterState
( set-ephemeral-node [ this path data ])
( delete-node [ this path ])
( create-sequential [ this path data ])
;; if node does not exist, create persistent with this data
( set-data [ this path data ])
( get-data [ this path watch? ])
( get-version [ this path watch? ])
( get-data-with-version [ this path watch? ])
( get-children [ this path watch? ])
( mkdirs [ this path ])
( close [ this ])
( register [ this callback ])
( unregister [ this id ]))
( set-ephemeral-node [ this path data ])
( delete-node [ this path ])
( create-sequential [ this path data ])
;; if node does not exist, create persistent with this data
( set-data [ this path data ])
( get-data [ this path watch? ])
( get-version [ this path watch? ])
( get-data-with-version [ this path watch? ])
( get-children [ this path watch? ])
( mkdirs [ this path ])
( close [ this ])
( register [ this callback ])
( unregister [ this id ]))
Storm使用Zookeeper的操作
(
defprotocol
StormClusterState
( assignments [ this callback ])
( assignment-info [ this storm-id callback ])
( assignment-info-with-version [ this storm-id callback ])
( assignment-version [ this storm-id callback ])
( active-storms [ this ])
( storm-base [ this storm-id callback ])
( get-worker-heartbeat [ this storm-id node port ])
( executor-beats [ this storm-id executor->node+port ])
( supervisors [ this callback ])
( supervisor-info [ this supervisor-id ]) ;; returns nil if doesn't exist
( setup-heartbeats! [ this storm-id ])
( teardown-heartbeats! [ this storm-id ])
( teardown-topology-errors! [ this storm-id ])
( heartbeat-storms [ this ])
( error-topologies [ this ])
( worker-heartbeat! [ this storm-id node port info ])
( remove-worker-heartbeat! [ this storm-id node port ])
( supervisor-heartbeat! [ this supervisor-id info ])
( activate-storm! [ this storm-id storm-base ])
( update-storm! [ this storm-id new-elems ])
( remove-storm-base! [ this storm-id ])
( set-assignment! [ this storm-id info ])
( remove-storm! [ this storm-id ])
( report-error [ this storm-id task-id node port error ])
( errors [ this storm-id task-id ])
( disconnect [ this ]))
( assignments [ this callback ])
( assignment-info [ this storm-id callback ])
( assignment-info-with-version [ this storm-id callback ])
( assignment-version [ this storm-id callback ])
( active-storms [ this ])
( storm-base [ this storm-id callback ])
( get-worker-heartbeat [ this storm-id node port ])
( executor-beats [ this storm-id executor->node+port ])
( supervisors [ this callback ])
( supervisor-info [ this supervisor-id ]) ;; returns nil if doesn't exist
( setup-heartbeats! [ this storm-id ])
( teardown-heartbeats! [ this storm-id ])
( teardown-topology-errors! [ this storm-id ])
( heartbeat-storms [ this ])
( error-topologies [ this ])
( worker-heartbeat! [ this storm-id node port info ])
( remove-worker-heartbeat! [ this storm-id node port ])
( supervisor-heartbeat! [ this supervisor-id info ])
( activate-storm! [ this storm-id storm-base ])
( update-storm! [ this storm-id new-elems ])
( remove-storm-base! [ this storm-id ])
( set-assignment! [ this storm-id info ])
( remove-storm! [ this storm-id ])
( report-error [ this storm-id task-id node port error ])
( errors [ this storm-id task-id ])
( disconnect [ this ]))
Storm中在Zookeeper中存储的目录
(
def
ASSIGNMENTS-ROOT
"assignments")
( def CODE-ROOT "code")
( def STORMS-ROOT "storms")
( def SUPERVISORS-ROOT "supervisors")
( def WORKERBEATS-ROOT "workerbeats")
( def ERRORS-ROOT "errors")
( def ASSIGNMENTS-SUBTREE ( str "/" ASSIGNMENTS-ROOT))
( def STORMS-SUBTREE ( str "/" STORMS-ROOT))
( def SUPERVISORS-SUBTREE ( str "/" SUPERVISORS-ROOT))
( def WORKERBEATS-SUBTREE ( str "/" WORKERBEATS-ROOT))
( def ERRORS-SUBTREE ( str "/" ERRORS-ROOT))
( def CODE-ROOT "code")
( def STORMS-ROOT "storms")
( def SUPERVISORS-ROOT "supervisors")
( def WORKERBEATS-ROOT "workerbeats")
( def ERRORS-ROOT "errors")
( def ASSIGNMENTS-SUBTREE ( str "/" ASSIGNMENTS-ROOT))
( def STORMS-SUBTREE ( str "/" STORMS-ROOT))
( def SUPERVISORS-SUBTREE ( str "/" SUPERVISORS-ROOT))
( def WORKERBEATS-SUBTREE ( str "/" WORKERBEATS-ROOT))
( def ERRORS-SUBTREE ( str "/" ERRORS-ROOT))
从上面来看,在Zookeeper中主要是有如下的五个子目录:
- /assignments -> 任务分配信息
- /storms -> 正在运行的topology的ID
- /supervisors -> 所有的Supervisors的心跳信息
- /workerbeats -> 所有的Worker的心跳
- /errors -> 产生的出错信息
结构图
/-
{
storm-zk-root
}
--
storm
在
zookeeper
上的根目录(
默认为
/storm)
|
| -/assignments -- topology 的任务分配信息
| |
| | -/ { topology-id } -- 这个下面保存的是每个 topology 的 assignments 信息包括: 对应的
| nimbus 上的代码目录, 所有 task 的启动时间, 每个 task 与机器、端口的映射
| 操作为( assignments) 来获取所有的 assignments 的值;
| 以及( assignment-info storm-id) 来得到给定的 storm-id 对应的 AssignmentInfo 信息
| 在 AssignmentInfo 中存储的内容有:
| :executor->node+port :executor->start-time-secs :node->host
| 具体定义在 common.clj 中的
| ( defrecord Assignment [ master-code-dir node->host executor->node+port executor->start-time-secs ])
|
| -/storms -- 这个目录保存所有正在运行的topology的id
| |
| |
| | -/ { topology-id } -- 这个文件保存这个 topology 的一些信息,包括 topology 的
| 名字, topology 开始运行的时间以及这个 topology 的状态
| 操作( active-storms), 获得当前路径下活跃的 topology 数据。保存的内容参考类 StormBase
| ( storm-base storm-id) 得到给定的 storm-id 下的 StormBase 数据, 具体定义在 common.clj 中的
| ( defrecord StormBase [ storm-name launch-time-secs status num-workers component->executors ])
|
| -/supervisors -- 这个目录保存所有的 supervisor的心跳信息
| |
| |
| | -/ { supervisor-id } -- 这个文件保存的是 supervisor 的心跳信息包括:心跳时间,主
| 机名,这个 supervisor 上 worker 的端口号运行时间( 具体看 SupervisorInfo 类)
| 操作( supervisors) 得到所有的 supervisors 节点
| ( supervisor-info supervisor-id) 得到给定的 supervisor-id 对应的 SupervisorInfo 信息
| 具体定义在 common.clj 中的
| ( defrecord SupervisorInfo [ time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs ])
|
| -/workerbeats -- 所有 worker 的心跳
| |
| | -/ { topology-id } -- 这个目录保存这个 topology 的所有的 worker 的心跳信息
| |
| | -/ { supervisorId-port } -- worker 的心跳信息,包括心跳的时
| 间, worker 运行时间以及一些统计信息
| 操作( heartbeat-storms) 得到所有有心跳数据的 topology ,
| ( get-worker-heartbeat storm-id node port) 得到具体一个 topology 下的某个 worker( node :port) 的心跳状况,
| ( executor-beats storm-id executor->node+port) 得到一个 executor 的心跳状况
|
| -/errors -- 所有产生的 error 信息
|
| -/ { topology-id } -- 这个目录保存这个 topology 下面的错误信息
| 操作( error-topologies) 得到出错的 topology
| ( errors storm-id component-id) 得到给定的 storm-id component-id 下的出错信息
| -/ { component-id }
|
| -/assignments -- topology 的任务分配信息
| |
| | -/ { topology-id } -- 这个下面保存的是每个 topology 的 assignments 信息包括: 对应的
| nimbus 上的代码目录, 所有 task 的启动时间, 每个 task 与机器、端口的映射
| 操作为( assignments) 来获取所有的 assignments 的值;
| 以及( assignment-info storm-id) 来得到给定的 storm-id 对应的 AssignmentInfo 信息
| 在 AssignmentInfo 中存储的内容有:
| :executor->node+port :executor->start-time-secs :node->host
| 具体定义在 common.clj 中的
| ( defrecord Assignment [ master-code-dir node->host executor->node+port executor->start-time-secs ])
|
| -/storms -- 这个目录保存所有正在运行的topology的id
| |
| |
| | -/ { topology-id } -- 这个文件保存这个 topology 的一些信息,包括 topology 的
| 名字, topology 开始运行的时间以及这个 topology 的状态
| 操作( active-storms), 获得当前路径下活跃的 topology 数据。保存的内容参考类 StormBase
| ( storm-base storm-id) 得到给定的 storm-id 下的 StormBase 数据, 具体定义在 common.clj 中的
| ( defrecord StormBase [ storm-name launch-time-secs status num-workers component->executors ])
|
| -/supervisors -- 这个目录保存所有的 supervisor的心跳信息
| |
| |
| | -/ { supervisor-id } -- 这个文件保存的是 supervisor 的心跳信息包括:心跳时间,主
| 机名,这个 supervisor 上 worker 的端口号运行时间( 具体看 SupervisorInfo 类)
| 操作( supervisors) 得到所有的 supervisors 节点
| ( supervisor-info supervisor-id) 得到给定的 supervisor-id 对应的 SupervisorInfo 信息
| 具体定义在 common.clj 中的
| ( defrecord SupervisorInfo [ time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs ])
|
| -/workerbeats -- 所有 worker 的心跳
| |
| | -/ { topology-id } -- 这个目录保存这个 topology 的所有的 worker 的心跳信息
| |
| | -/ { supervisorId-port } -- worker 的心跳信息,包括心跳的时
| 间, worker 运行时间以及一些统计信息
| 操作( heartbeat-storms) 得到所有有心跳数据的 topology ,
| ( get-worker-heartbeat storm-id node port) 得到具体一个 topology 下的某个 worker( node :port) 的心跳状况,
| ( executor-beats storm-id executor->node+port) 得到一个 executor 的心跳状况
|
| -/errors -- 所有产生的 error 信息
|
| -/ { topology-id } -- 这个目录保存这个 topology 下面的错误信息
| 操作( error-topologies) 得到出错的 topology
| ( errors storm-id component-id) 得到给定的 storm-id component-id 下的出错信息
| -/ { component-id }