Nimbus Server
Nimbus server, 首先從啟動命令開始, 同樣是使用storm命令"storm nimbus”來啟動
看下源碼, 此處和上面client不同, jvmtype="-server", 最終調用"backtype.storm.daemon.nimbus"的main
nimbus是用clojure實現的, 但是clojure是基於JVM的, 所以在最終發布的時候會產生nimbus.class, 所以在用戶使用的時候完全可以不知道clojure, 看上去所有都是Java
clojure只是用於提高開發效率而已.
def nimbus(): """Syntax: [storm nimbus] Launches the nimbus daemon. This command should be run under supervision with a tool like daemontools or monit. See Setting up a Storm cluster for more information. (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster) """ cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"] childopts = confvalue("nimbus.childopts", cppaths) + " -Dlogfile.name=nimbus.log -Dlog4j.configuration=storm.log.properties" exec_storm_class( "backtype.storm.daemon.nimbus", jvmtype="-server", extrajars=cppaths, childopts=childopts)
launch-server!
來看看nimbus的main, 最終會調到launch-server!, conf參數是調用read-storm-config讀出的配置參數,
而nimbus是INimbus接口(backtype.storm.scheduler.INimbus)的實現, 可以參考standalone-nimbus
(defn -main [] (-launch (standalone-nimbus)))
(defn -launch [nimbus] (launch-server! (read-storm-config) nimbus)) (defn launch-server! [conf nimbus] (validate-distributed-mode! conf) (let [service-handler (service-handler conf nimbus) options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT))) (THsHaServer$Args.) (.workerThreads 64) (.protocolFactory (TBinaryProtocol$Factory.)) (.processor (Nimbus$Processor. service-handler)) ) server (THsHaServer. options)] (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server)))) (log-message "Starting Nimbus server...") (.serve server)))
1. service-handler
首先定義service-handler, service-handler前面的定義如下
(defserverfn service-handler [conf inimbus] (reify Nimbus$Iface ...) )
這邊用到一個macro定義defserverfn, 如下
(defmacro defserverfn [name & body] `(let [exec-fn# (fn ~@body)] (defn ~name [& args#]0 (try-cause (apply exec-fn# args#) (catch InterruptedException e# (throw e#)) (catch Throwable t# (log-error t# "Error on initialization of server " ~(str name)) (halt-process! 13 "Error on initialization") )))))
定義匿名函數 fn[conf inimbus] (……)
定義函數defn service-handler [& args], 里面只是簡單的調用fn…使用這個macro和直接定義defn service-handler [conf inimbus]幾乎沒有啥區別
我有個疑問, 為什么要定義這個無聊的macro, 難道就是為了便於后面的exception處理
2. server
其中processor, 是server上主要的處理進程, 使用傳入的service-handler進行數據處理
Nimbus$Iface
Nimbus server已經啟動, 剩下就是處理從client傳來的RPC調用, 關鍵就是Nimbus$Iface的實現
在下面的實現中總是用到nimbus這個變量, nimbus-data, 用於存放nimbus相關配置和全局的參數
let [nimbus (nimbus-data conf inimbus)]
(defn nimbus-data [conf inimbus] (let [forced-scheduler (.getForcedScheduler inimbus)] {:conf conf :inimbus inimbus :submitted-count (atom 0) ;記錄多少topology被submit :storm-cluster-state (cluster/mk-storm-cluster-state conf) ;抽象Zookeeper接口(Zookeeper用於存放cluster state) :submit-lock (Object.) ;創建鎖對象,用於各個topology之間的互斥操作, 比如建目錄 :heartbeats-cache (atom {}) ;記錄各個Topology的heartbeats的cache :downloaders (file-cache-map conf) :uploaders (file-cache-map conf) :uptime (uptime-computer) :validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR)) :timer (mk-timer :kill-fn (fn [t] (log-error t "Error when processing event") (halt-process! 20 "Error when processing an event") )) :scheduler (mk-scheduler conf inimbus) }))
接着重點看下submitTopology,
4個參數,
^String storm-name, storm名字
^String uploadedJarLocation, 上傳Jar的目錄
^String serializedConf, 序列化過的Conf信息
^StormTopology topology, topology對象(thrift對象), 由topologyBuilder產生
(^void submitTopology [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology] (try (validate-topology-name! storm-name) ;;名字起的是否符合規范 (check-storm-active! nimbus storm-name false) ;;check是否active (.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) ;;調用用戶定義的validator.validate storm-name (from-json serializedConf) topology) (swap! (:submitted-count nimbus) inc) ;;submitted-count加1, 表示nimbus上submit的topology的數量 (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs)) ;;生成storm-id storm-conf (normalize-conf ;;轉化成json,增加kv,最終生成storm-conf conf (-> serializedConf from-json (assoc STORM-ID storm-id) (assoc TOPOLOGY-NAME storm-name)) topology) total-storm-conf (merge conf storm-conf) topology (normalize-topology total-storm-conf topology) ;;規范化的topology對象 topology (if (total-storm-conf TOPOLOGY-OPTIMIZE) (optimize-topology topology) topology) storm-cluster-state (:storm-cluster-state nimbus)] ;;操作zk的interface (system-topology! total-storm-conf topology) ;; this validates the structure of the topology, 1. System-topology! (log-message "Received topology submission for " storm-name " with conf " storm-conf) ;; lock protects against multiple topologies being submitted at once and ;; cleanup thread killing topology in b/w assignment and starting the topology (locking (:submit-lock nimbus) (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology) ;;2. 建立topology的本地目錄 (.setup-heartbeats! storm-cluster-state storm-id) ;;3. 建立Zookeeper heartbeats (start-storm nimbus storm-name storm-id) ;;4. start-storm (mk-assignments nimbus))) ;;5. mk-assignments (catch Throwable e (log-warn-error e "Topology submission exception. (topology name='" storm-name "')") (throw e))))
1. System-topology!
Validate Topology, 比如使用的comonentid, steamid是否合法
添加系統所需要的component, 比如acker等, 不過沒有用到, 不知道為什么要調用System-topology!
(system-topology! total-storm-conf topology) ;; this validates the structure of the topology (defn system-topology! [storm-conf ^StormTopology topology] (validate-basic! topology) (let [ret (.deepCopy topology)] (add-acker! storm-conf ret) (add-metric-components! storm-conf ret) (add-system-components! storm-conf ret) (add-metric-streams! ret) (add-system-streams! ret) (validate-structure! ret) ret ))
2. 建立topology的本地目錄 (這步開始需要lock互斥)
Jars and configs are kept on local filesystem because they're too big for Zookeeper. The jar and configs are copied into the path {nimbus local dir}/stormdist/{topology id}
(setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
借用這張圖, 比較清晰, 先創建目錄, 並將Jar move到當前目錄
再將topology對象和conf對象都序列化保存到目錄中
3. 建立Zookeeper heartbeats
就是按照下面圖示在Zookeeper建立topology的心跳目錄
(.setup-heartbeats! storm-cluster-state storm-id)
(setup-heartbeats! [this storm-id] (mkdirs cluster-state (workerbeat-storm-root storm-id))) (defn mkdirs [^CuratorFramework zk ^String path] (let [path (normalize-path path)] (when-not (or (= path "/") (exists-node? zk path false)) (mkdirs zk (parent-path path)) (try-cause (create-node zk path (barr 7) :persistent) (catch KeeperException$NodeExistsException e ;; this can happen when multiple clients doing mkdir at same time )) )))

4. start-storm, 產生StormBase
雖然叫做start-storm, 其實做的事情只是把StormBase結構序列化並放到zookeeper上
這個StormBase和topology對象有什么區別,
topology對象, topology的靜態信息, 包含components的詳細信息和之間的拓撲關系, 內容比較多所以存儲在磁盤上stormcode.ser
而StormBase, topology的動態信息, 只記錄了launch時間, status, worker數, component的executor數運行態數據, 比較mini, 所以放在zk上
(defn- start-storm [nimbus storm-name storm-id] (let [storm-cluster-state (:storm-cluster-state nimbus) conf (:conf nimbus) storm-conf (read-storm-conf conf storm-id) topology (system-topology! storm-conf (read-storm-topology conf storm-id)) num-executors (->> (all-components topology) (map-val num-start-executors))] (log-message "Activating " storm-name ": " storm-id) (.activate-storm! storm-cluster-state storm-id (StormBase. storm-name (current-time-secs) {:type :active} (storm-conf TOPOLOGY-WORKERS) num-executors))))
;; component->executors is a map from spout/bolt id to number of executors for that component (defrecord StormBase [storm-name launch-time-secs status num-workers component->executors])
struct ComponentCommon { 1: required map<GlobalStreamId, Grouping> inputs; 2: required map<string, StreamInfo> streams; //key is stream id 3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component 4: optional string json_conf; }
重上面可以看出StormBase是定義的一個record, 包含storm-name, 當前時間戳, topology的初始狀態(active或inactive), worker數目, 和executor的數目
其中計算num-executors, 使用->>, 其實等於(map-val num-start-executors (all-components topology)), map-value就是對(k,v)中的value執行num-start-executors, 而這個函數其實就是去讀ComponentCommon里面的parallelism_hint, 所以num-executors, 描述每個component需要幾個executors(線程)
(activate-storm! [this storm-id storm-base] (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)) )
(defn storm-path [id] (str STORMS-SUBTREE "/" id)) ;/storms/id
(defn set-data [^CuratorFramework zk ^String path ^bytes data] (.. zk (setData) (forPath (normalize-path path) data)))
最終調用activate-storm!將storm-base序列化后的數據存到Zookeeper的"/storms/id”目錄下
5. mk-assignments