storm啟動nimbus源碼分析-nimbus.clj


nimbus是storm集群的"控制器",是storm集群的重要組成部分。我們可以通用執行bin/storm nimbus >/dev/null 2>&1 &來啟動nimbus。bin/storm是一個python腳本,在這個腳本中定義了一個nimbus函數:

 

nimbus函數

 

def nimbus( klass = "backtype.storm.daemon.nimbus" ):
    """Syntax: [storm nimbus]

   Launches the nimbus daemon. This command should be run under
   supervision with a tool like daemontools or monit.

   See Setting up a Storm cluster for more information.
   (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
   """
    cppaths = [ STORM_DIR + "/log4j" , STORM_DIR + "/conf" ]
    jvmopts = parse_args( confvalue( "nimbus.childopts" , cppaths)) + [
        "-Dlogfile.name=nimbus.log" ,
        "-Dlog4j.configuration=storm.log.properties" ,
    ]
    exec_storm_class(
        klass ,
        jvmtype = "-server" ,
        extrajars = cppaths ,
        jvmopts = jvmopts)

klass參數的默認值為backtype.storm.daemon.nimbus,backtype.storm.daemon.nimbus標識一個java類。STORM_DIR標識storm的安裝目錄,cppaths集合存放了log4j配置文件路徑和storm配置文件storm.yaml路徑,jvmopts存放傳遞給jvm的參數,包括log4j配文件路徑、storm.yaml路徑、log4j日志名稱和log4j配置文件名稱。

exec_storm_class函數的邏輯比較簡單,具體實現如下:

exec_storm_class函數

 

def exec_storm_class( klass , jvmtype = "-server" , jvmopts = [], extrajars = [], args = [], fork = False ):  
    global CONFFILE  
    all_args = [  
        "java" , jvmtype , get_config_opts (),  
        "-Dstorm.home=" + STORM_DIR ,  
        "-Djava.library.path=" + confvalue( "java.library.path" , extrajars ),  
        "-Dstorm.conf.file=" + CONFFILE ,  
        "-cp" , get_classpath( extrajars ),  
    ] + jvmopts + [ klass ] + list( args)  
    print "Running: " + " " . join( all_args)  
    if fork :  
        os . spawnvp( os . P_WAIT , "java" , all_args)  
    else :  
        os . execvp( "java" , all_args) # replaces the current process and never returns

get_config_opts()獲取jvm的默認配置信息,confvalue("java.library.path", extrajars)獲取storm使用的本地庫JZMQ加載路徑,get_classpath(extrajars)獲取所有依賴jar包的完整路徑,然后拼接一個java -cp命令運行klass的main方法。
klass默認值為backtype.storm.daemon.nimbus,所以exec_storm_class函數最終調用backtype.storm.daemon.nimbus類的main方法。

backtype.storm.daemon.nimbus類定義在nimbus.clj文件中,定義如下:

backtype.storm.daemon.nimbus類

 

(ns backtype.storm.daemon.nimbus
 ( :import [ org.apache.thrift.server THsHaServer THsHaServer$Args ])
 ( :import [ org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory ])
 ( :import [ org.apache.thrift.exception ])
 ( :import [ org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket ])
 ( :import [ java.nio ByteBuffer ])
 ( :import [ java.io FileNotFoundException ])
 ( :import [ java.nio.channels Channels WritableByteChannel ])
 ( :use [ backtype.storm.scheduler.DefaultScheduler ])
 ( :import [ backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
            Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails ])
 ( :use [ backtype.storm bootstrap util ])
 ( :use [ backtype.storm.config :only [ validate-configs-with-schemas ]])
 ( :use [ backtype.storm.daemon common ])
 ( :gen-class
    :methods [ ^ { :static true } [ launch [ backtype.storm.scheduler.INimbus ] void ]]))
    ...
    ;; 其他方法
    ...
   ( defn -main []
 ( -launch ( standalone-nimbus)))

:gen-class指示Clojure生成Java類backtype.storm.daemon.nimbus,並且聲明一個靜態方法launch,launch方法接收一個實現backtype.storm.scheduler.INimbus接口的實例作為參數。launch函數的參數是由standalone-nimbus函數生成的。standalone-nimbus函數定義如下:返回一個實現INimbus接口的實例。

standalone-nimbus函數

 

( defn standalone-nimbus []
  ;; 實現INimbus接口
 ( reify INimbus
    ;; prepare函數為空實現
   ( prepare [ this conf local-dir ]
     )
    ;; allSlotsAvailableForScheduling獲取所有可用的slot集合
   ( allSlotsAvailableForScheduling [ this supervisors topologies topologies-missing-assignments ]
      ;; supervisors標識集群所有supervisor的詳細信息對象SupervisorDetails的集合
     ( ->> supervisors
          ;; 遍歷supervisors,為supervisor的每個port生成對應的WorkerSlot對象,WorkerSlot包含兩個屬性節點id和port
          ( mapcat ( fn [ ^ SupervisorDetails s ]
                    ( for [p ( .getMeta s )]
                      ( WorkerSlot. ( .getId s) p))))
          set ))
   ( assignSlots [ this topology slots ]
     )
   ( getForcedScheduler [ this ]
      nil )
    ;; 獲取supervisor主機名
   ( getHostName [ this supervisors node-id ]
     ( if-let [ ^ SupervisorDetails supervisor ( get supervisors node-id )]
       ( .getHost supervisor)))
   ))

launch函數定義如下:

launch函數

 

( defn -launch [ nimbus ]
 ;;
  ;; read-storm-config函數用於讀取storm集群的配置信息,參見其定義部分
 ( launch-server! ( read-storm-config) nimbus))
launch-server! 函數定義如下:  
( defn launch-server! [ conf nimbus ]
  ;; 判斷是否是分布式模式,如果是本地模式則拋出IllegalArgumentException
 ( validate-distributed-mode! conf)
  ;; service-handler函數是由宏defserverfn定義的,返回一個實現了Nimbus類中的Iface接口的實例,Nimbus類是由thrift框架自動生成的,Iface接口封裝了service Nimbus的全部接口。
  ;; nimbus thrift server端提供的接口服務都是由這個實例實現的。service-handler函數參見其定義部分,service Nimbus參見storm.thrift
  ;; service-handler綁定實現了Nimbus類中的Iface接口的實例
 ( let [ service-handler ( service-handler conf nimbus)
        options ( -> ( TNonblockingServerSocket. ( int ( conf NIMBUS-THRIFT-PORT)))
                   ( THsHaServer$Args.)
                   ( .workerThreads 64)
                   ( .protocolFactory ( TBinaryProtocol$Factory. false true ( conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)))
                   ( .processor ( Nimbus$Processor. service-handler))
                   )
      server ( THsHaServer. ( do ( set! ( . options maxReadBufferBytes)( conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options ))]
   ( add-shutdown-hook-with-force-kill-in-1-sec ( fn []
                                                 ( .shutdown service-handler)
                                                 ( .stop server)))
   ( log-message "Starting Nimbus server...")
   ( .serve server)))

read-storm-config定義如下: 

read-storm-config函數

 

( defn read-storm-config  
  []  
  ;; conf綁定storm集群配置信息
 ( let [ conf ( clojurify-structure ( Utils/readStormConfig ))]  
    ;; validate-configs-with-schemas函數驗證配置信息的正確性並刪除不正確的配置信息
   ( validate-configs-with-schemas conf)  
    conf))

read-storm-config函數調用了backtype.storm.utils.Utils類的靜態方法readStormConfig,如下:

readStormConfig方法

 

public static Map readStormConfig() {  
        // 調用readDefaultConfig從defaults.yaml配置文件讀取默認配置信息存入ret
        Map ret = readDefaultConfig();
        // 獲取用戶自定義配置文件路徑
        String confFile = System . getProperty( "storm.conf.file");  
        Map storm;  
        if ( confFile == null || confFile . equals( "")) {  
            storm = findAndReadConfigFile( "storm.yaml" , false);  
        } else {
            // 讀取用戶自定義配置信息
            storm = findAndReadConfigFile( confFile , true);  
        }  
        // 將用戶自定義的配置信息覆蓋更新到ret中
        ret . putAll( storm);
        // 將命令行方式提供的配置信息覆蓋更新到ret中
        ret . putAll( readCommandLineOpts());  
        // 返回覆蓋更新后的配置信息ret
        return ret;  
    }

service-handler函數定義如下:

defserverfn是一個宏,(defserverfn service-handler [conf inimbus] ... )返回一個名字為service-handler函數。宏擴展是在編譯時進行的

service-handler函數

 

( defserverfn service-handler [ conf inimbus ]
  ;; 調用inimbus的prepare方法,inimbus是standalone-nimbus函數返回的實現INimbus接口的實例,當前版本prepare方法為空實現
 ( .prepare inimbus conf ( master-inimbus-dir conf))
  ;; 打印日志信息
 ( log-message "Starting Nimbus with conf " conf)
  ;; nimbus綁定了一個map,這個map保存了nimbus端所必需的"屬性",詳見nimbus-data函數定義部分
 ( let [ nimbus ( nimbus-data conf inimbus )]
    ;; 調用nimbus這個map中保存的backtype.storm.nimbus.DefaultTopologyValidator對象的prepare方法,通過查看backtype.storm.nimbus.DefaultTopologyValidator類,我們可以發現prepare默認為空實現
   ( .prepare ^ backtype.storm.nimbus.ITopologyValidator ( :validator nimbus) conf)
    ;; cleanup-corrupt-topologies!函數的主要功能就是將在nimbus服務器{storm.local.dir}/nimbus/stormdist/路徑中不存在的topology id從zookeeper的/storms/路徑中刪除,即刪除在nimbus服務器上缺失jar包、topology信息和配置信息的當前正在運行的topology,
    ;; cleanup-corrupt-topologies!函數參見其定義部分
   ( cleanup-corrupt-topologies! nimbus)
    ;; 更新當前storm集群上topology的狀態
   ( doseq [ storm-id ( .active-storms ( :storm-cluster-state nimbus ))]
      ;; transition!函數主要功能就是負責topology狀態轉換,規定了當topology由一種狀態轉換成另一種新狀態時,需要做哪些處理操作,參見其定義部分
     ( transition! nimbus storm-id :startup))
    ;; 通過schedule-recurring函數向storm定時器添加了一個"周期任務"檢查心跳,重新分配任務,清理不活躍的topology,mk-assignments函數的主要功能就是檢查心跳和重新分配任務。關於storm定時器詳細分析請見"storm定時器timer源碼分析";關於mk-assignments函數請見"storm任務分配源碼分析"
    ;; do-cleanup函數主要功能就是清理不活躍的topology,請參加其定義部分
   ( schedule-recurring ( :timer nimbus)
                        0
                       ( conf NIMBUS-MONITOR-FREQ-SECS)
                       ( fn []
                         ( when ( conf NIMBUS-REASSIGN)
                           ( locking ( :submit-lock nimbus)
                             ( mk-assignments nimbus)))
                         ( do-cleanup nimbus)
                         ))
    ;; Schedule Nimbus inbox cleaner
    ;; 通過schedule-recurring函數向storm定時器添加一個"周期任務"刪除nimbus服務器上的過期jar包
   ( schedule-recurring ( :timer nimbus)
                        0
                       ( conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
                       ( fn []
                         ( clean-inbox ( inbox nimbus) ( conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))
                         ))    
   ( reify Nimbus$Iface
      ;; submitTopologyWithOpts函數負責topology的提交,有關該函數的詳細分析請參見"storm源碼分析之topology提交過程"
     ( ^ void submitTopologyWithOpts
        [ this ^ String storm-name ^ String uploadedJarLocation ^ String serializedConf ^ StormTopology topology
        ^ SubmitOptions submitOptions ]
       ( try
         ( assert ( not-nil? submitOptions))
         ( validate-topology-name! storm-name)
         ( check-storm-active! nimbus storm-name false)
         ( let [ topo-conf ( from-json serializedConf )]
           ( try
             ( validate-configs-with-schemas topo-conf)
             ( catch IllegalArgumentException ex
               ( throw ( InvalidTopologyException. ( .getMessage ex)))))
           ( .validate ^ backtype.storm.nimbus.ITopologyValidator ( :validator nimbus)
                      storm-name
                      topo-conf
                      topology))
         ( swap! ( :submitted-count nimbus) inc)
         ( let [ storm-id ( str storm-name "-" @( :submitted-count nimbus) "-" ( current-time-secs))
                storm-conf ( normalize-conf
                            conf
                           ( -> serializedConf
                                from-json
                               ( assoc STORM-ID storm-id)
                             ( assoc TOPOLOGY-NAME storm-name))
                            topology)
                total-storm-conf ( merge conf storm-conf)
                topology ( normalize-topology total-storm-conf topology)
                storm-cluster-state ( :storm-cluster-state nimbus )]
           ( system-topology! total-storm-conf topology) ;; this validates the structure of the topology
           ( log-message "Received topology submission for " storm-name " with conf " storm-conf)
            ;; lock protects against multiple topologies being submitted at once and
            ;; cleanup thread killing topology in b/w assignment and starting the topology
           ( locking ( :submit-lock nimbus)
             ( setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
             ( .setup-heartbeats! storm-cluster-state storm-id)
             ( let [ thrift-status->kw-status { TopologyInitialStatus/INACTIVE :inactive
                                              TopologyInitialStatus/ACTIVE :active }]
               ( start-storm nimbus storm-name storm-id ( thrift-status->kw-status ( .get_initial_status submitOptions))))
             ( mk-assignments nimbus)))
         ( catch Throwable e
           ( log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
           ( throw e))))
      ;; submitTopology函數調用了submitTopologyWithOpts函數
     ( ^ void submitTopology
        [ this ^ String storm-name ^ String uploadedJarLocation ^ String serializedConf ^ StormTopology topology ]
       ( .submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
                                ( SubmitOptions. TopologyInitialStatus/ACTIVE)))
      ;; killTopology函數見名知意,調用了killTopologyWithOpts函數
     ( ^ void killTopology [ this ^ String name ]
       ( .killTopologyWithOpts this name ( KillOptions.)))
      ;; storm-name綁定kill的topology名稱,KillOptions是一個thrift數據結構,只有個屬性wait_secs,表示延遲多長時間執行kill
     ( ^ void killTopologyWithOpts [ this ^ String storm-name ^ KillOptions options ]
        ;; check-storm-active!檢查topology是否是"active",如果不活躍則拋出異常
       ( check-storm-active! nimbus storm-name true)
        ;; 如果設置了延遲時間,wait-amt綁定延遲時間
       ( let [ wait-amt ( if ( .is_set_wait_secs options)
                        ( .get_wait_secs options)                        
                        )]
          ;; transition-name!函數主要功能就是根據storm-name獲取topology id,然后調用transition!函數,topology由當前狀態轉換到:kill狀態,:kill狀態是一個"臨時狀態",最終修改topology狀態為:killed,:killed狀態為"持久狀態"
          ;; 通過state-transitions函數我們可以知道無論從哪種狀態轉換到:kill狀態,都將調用kill-transition函數,kill-transition通過調用delay-event向storm定時器添加一個定時任務,這個定時任務的主要功能就是負責topology由:killed狀態
          ;; 轉換到:remove狀態,這時將調用remove-storm!函數清理topology
         ( transition-name! nimbus storm-name [ :kill wait-amt ] true)
         ))
      ;; rebalance函數可以重新設置topology的進程數和各個component的並行度,RebalanceOptions是thirft數據結構,有三個屬性rebalance的延遲時間、新的進程數,新的並行度
     ( ^ void rebalance [ this ^ String storm-name ^ RebalanceOptions options ]
        ;; check-storm-active!檢查topology是否是"active",如果不活躍則拋出異常
       ( check-storm-active! nimbus storm-name true)
        ;; 如果設置了延遲時間,wait-amt綁定延遲時間
       ( let [ wait-amt ( if ( .is_set_wait_secs options)
                        ( .get_wait_secs options))
              ;; 如果設置了新的進程數,num-workers綁定新進程數
              num-workers ( if ( .is_set_num_workers options)
                           ( .get_num_workers options))
              ;; 如果設置了新的組件並行度,executor-overrides綁定新組件並行度
              executor-overrides ( if ( .is_set_num_executors options)
                                  ( .get_num_executors options)
                                  {})]
         ( doseq [[ c num-executors ] executor-overrides ]
           ( when ( <= num-executors 0)
             ( throw ( InvalidTopologyException. "Number of executors must be greater than 0"))
             ))
          ;; transition-name!函數主要功能就是根據storm-name獲取topology id,然后調用transition!函數,topology由當前狀態轉換到:rebalance狀態,:rebalance狀態是一個"臨時狀態",最終修改topology狀態為:rebalancing,:rebalancing狀態為"持久狀態"
          ;; 通過state-transitions函數我們可以知道只允許從:active和:inactive狀態轉換到:rebalance狀態,並調用rebalance-transition函數,rebalance-transition通過調用delay-event向storm定時器添加一個定時任務,這個定時任務的主要功能就是負責topology由:rebalancing狀態
          ;; 轉換到:do-rebalance狀態,並調用do-rebalance函數(重新設置topology的進程數和組件並行度,然后調用mk-assignments函數重新進行任務分配),然后將topology狀態修改成:rebalancing的前一個狀態
         ( transition-name! nimbus storm-name [ :rebalance wait-amt num-workers executor-overrides ] true)
         ))
        ;; 激活topology,將topology狀態修改成:active,處理過程與killTopologyWithOpts、rebalance相似
     ( activate [ this storm-name ]
       ( transition-name! nimbus storm-name :activate true)
       )
      ;; 將topology狀態修改成:inactive,deactivate處理過程與activate相似
     ( deactivate [ this storm-name ]
       ( transition-name! nimbus storm-name :inactivate true))
      ;; beginFileUpload()函數獲取nimbus存放jar的目錄
     ( beginFileUpload [ this ]
       ( let [ fileloc ( str ( inbox nimbus) "/stormjar-" ( uuid) ".jar" )]
         ( .put ( :uploaders nimbus)
                fileloc
               ( Channels/newChannel ( FileOutputStream. fileloc)))
         ( log-message "Uploading file from client to " fileloc)
          fileloc
         ))
      ;; 上傳jar包文件
     ( ^ void uploadChunk [ this ^ String location ^ ByteBuffer chunk ]
       ( let [ uploaders ( :uploaders nimbus)
              ^ WritableByteChannel channel ( .get uploaders location )]
         ( when-not channel
           ( throw ( RuntimeException.
                    "File for that location does not exist (or timed out)")))
         ( .write channel chunk)
         ( .put uploaders location channel)
         ))
      ;; 上傳jar包完成,關閉Channel
     ( ^ void finishFileUpload [ this ^ String location ]
       ( let [ uploaders ( :uploaders nimbus)
              ^ WritableByteChannel channel ( .get uploaders location )]
         ( when-not channel
           ( throw ( RuntimeException.
                    "File for that location does not exist (or timed out)")))
         ( .close channel)
         ( log-message "Finished uploading file from client: " location)
         ( .remove uploaders location)
         ))
      ;; 獲取文件輸入流
     ( ^ String beginFileDownload [ this ^ String file ]
       ( let [ is ( BufferFileInputStream. file)
              id ( uuid )]
         ( .put ( :downloaders nimbus) id is)
          id
         ))
      ;; 讀取文件
     ( ^ ByteBuffer downloadChunk [ this ^ String id ]
       ( let [ downloaders ( :downloaders nimbus)
              ^ BufferFileInputStream is ( .get downloaders id )]
         ( when-not is
           ( throw ( RuntimeException.
                    "Could not find input stream for that id")))
         ( let [ ret ( .read is )]
           ( .put downloaders id is)
           ( when ( empty? ret)
             ( .remove downloaders id))
           ( ByteBuffer/wrap ret)
           )))
      ;; 獲取storm集群配置信息
     ( ^ String getNimbusConf [ this ]
       ( to-json ( :conf nimbus)))
      ;; 獲取topology配置信息
     ( ^ String getTopologyConf [ this ^ String id ]
       ( to-json ( try-read-storm-conf conf id)))
      ;; 獲取StormTopology
     ( ^ StormTopology getTopology [ this ^ String id ]
       ( system-topology! ( try-read-storm-conf conf id) ( try-read-storm-topology conf id)))

     ( ^ StormTopology getUserTopology [ this ^ String id ]
       ( try-read-storm-topology conf id))
            ;; 獲取當前集群的匯總信息包括supervisor匯總信息,nimbus啟動時間,所有活躍topology匯總信息
     ( ^ ClusterSummary getClusterInfo [ this ]
       ( let [ storm-cluster-state ( :storm-cluster-state nimbus)
                    ;; supervisor-infos綁定supervisor id->SupervisorInfo對象鍵值對的map
              ;; SupervisorInfo定義:(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs])
              supervisor-infos ( all-supervisor-info storm-cluster-state)
              ;; TODO: need to get the port info about supervisors...
              ;; in standalone just look at metadata, otherwise just say N/A?
              ;; 根據SupervisorInfo數據創建SupervisorSummary數據
              supervisor-summaries ( dofor [[ id info ] supervisor-infos ]
                                         ( let [ ports ( set ( :meta info)) ;;TODO: this is only true for standalone
                                                ]
                                           ( SupervisorSummary. ( :hostname info)
                                                               ( :uptime-secs info)
                                                               ( count ports)
                                                               ( count ( :used-ports info))
                                                                id )
                                           ))
              ;; nimbus-uptime綁定nimbus啟動時間                              
              nimbus-uptime (( :uptime nimbus))
              ;; bases綁定集群上所有活躍topology的StormBase數據集合
              bases ( topology-bases storm-cluster-state)
              ;; topology-summaries綁定活躍topology的TopologySummary數據
              topology-summaries ( dofor [[ id base ] bases ]
                                       ( let [ assignment ( .assignment-info storm-cluster-state id nil )]
                                         ( TopologySummary. id
                                                           ( :storm-name base)
                                                           ( ->> ( :executor->node+port assignment)
                                                                keys
                                                                ( mapcat executor-id->tasks)
                                                                count)
                                                           ( ->> ( :executor->node+port assignment)
                                                                keys
                                                                count)                                                            
                                                           ( ->> ( :executor->node+port assignment)
                                                                vals
                                                                set
                                                                count)
                                                           ( time-delta ( :launch-time-secs base))
                                                           ( extract-status-str base))
                                          ))]
          ;; 創建ClusterSummary數據
         ( ClusterSummary. supervisor-summaries
                          nimbus-uptime
                          topology-summaries)
         ))
      ;; 獲取指定storm-id的topology的TopologyInfo數據
     ( ^ TopologyInfo getTopologyInfo [ this ^ String storm-id ]
        ;; storm-cluster-state綁定StormClusterState對象
       ( let [ storm-cluster-state ( :storm-cluster-state nimbus)
              ;; task->component綁定任務id->組件名稱鍵值對的map,形如:{1 "boltA", 2 "boltA", 3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"}
              task->component ( storm-task-info ( try-read-storm-topology conf storm-id) ( try-read-storm-conf conf storm-id))
              ;; bases綁storm-id的StormBase
              base ( .storm-base storm-cluster-state storm-id nil)
              ;; assignment綁定該topology的AssignmentInfo信息,(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs])
              assignment ( .assignment-info storm-cluster-state storm-id nil)
              ;; beats綁定該topology所有executor-id->心跳信息的map
              beats ( .executor-beats storm-cluster-state storm-id ( :executor->node+port assignment))
              ;; all-components綁定該topology所有component-id集合
              all-components ( -> task->component reverse-map keys)
              ;; errors綁定component-id->組件錯誤信息的map
              errors ( ->> all-components
                         ( map ( fn [ c ] [ c ( get-errors storm-cluster-state storm-id c )]))
                         ( into {}))
              ;; executor-summaries綁定ExecutorSummary集合
              executor-summaries ( dofor [[ executor [ node port ]] ( :executor->node+port assignment )]
                                       ( let [ host ( -> assignment :node->host ( get node))
                                              heartbeat ( get beats executor)
                                              stats ( :stats heartbeat)
                                              stats ( if stats
                                                     ( stats/thriftify-executor-stats stats ))]
                                         ( doto
                                             ( ExecutorSummary. ( thriftify-executor-id executor)
                                                               ( -> executor first task->component)
                                                                host
                                                                port
                                                               ( nil-to-zero ( :uptime heartbeat)))
                                           ( .set_stats stats))
                                         ))
              ]
          ;; 創建TopologyInfo對象
         ( TopologyInfo. storm-id
                        ( :storm-name base)
                        ( time-delta ( :launch-time-secs base))
                        executor-summaries
                        ( extract-status-str base)
                        errors
                        )
         ))
     
      Shutdownable
     ( shutdown [ this ]
       ( log-message "Shutting down master")
       ( cancel-timer ( :timer nimbus))
       ( .disconnect ( :storm-cluster-state nimbus))
       ( .cleanup ( :downloaders nimbus))
       ( .cleanup ( :uploaders nimbus))
       ( log-message "Shut down master")
       )
      DaemonCommon
     ( waiting? [ this ]
       ( timer-waiting? ( :timer nimbus))))))

nimbus-data函數定義如下:

nimbus-data函數

 

( defn nimbus-data [ conf inimbus ]
 ( let [ forced-scheduler ( .getForcedScheduler inimbus )]
    ;; 保存storm集群的配置信息
    { :conf conf
    ;; 保存inimbus實例
    :inimbus inimbus
    ;; 初始化topology提交總數為0
    :submitted-count ( atom 0)
    ;; 調用cluster.clj中的mk-storm-cluster-state函數創建StormClusterState實例,StormClusterState實例封裝了與zookeeper交互的接口
    :storm-cluster-state ( cluster/mk-storm-cluster-state conf)
    ;; 保存"提交鎖",在topology提交時,需要先獲取該鎖,然后才能提交,這樣可以防止一次提交多個topology,也保證了topology之間操作的互斥性
    :submit-lock ( Object.)
    ;; 初始化心跳緩存
    :heartbeats-cache ( atom {})
    ;; 創建下載TimeCacheMap緩存,關於TimeCacheMap緩存會在以后文章中單獨分析,在此不做介紹
    :downloaders ( file-cache-map conf)
    ;; 創建上傳TimeCacheMap緩存
    :uploaders ( file-cache-map conf)
    ;; 保存一個返回值為"當前時間"-"nimbus啟動時間"的函數,調用該函數可以獲取nimbus啟動多長時間
    :uptime ( uptime-computer)
    ;; 通過java反射創建一個NIMBUS-TOPOLOGY-VALIDATOR指定的validator對象,默認為backtype.storm.nimbus.DefaultTopologyValidator對象
    :validator ( new-instance ( conf NIMBUS-TOPOLOGY-VALIDATOR))
    ;; mk-timer函數會創建一個"定時線程",關於定時線程會在以后文章中單位分析,在此不做介紹
    :timer ( mk-timer :kill-fn ( fn [ t ]
                                ( log-error t "Error when processing event")
                ;; exit-process!函數通過調用java的Runtime類的exit(int status)方法終止進程,並傳達狀態碼20
                                ( exit-process! 20 "Error when processing an event")
                                ))
    ;; 由mk-scheduler函數創建scheduler調度器,通過分析mk-scheduler函數,可以發現在沒有配置用戶自定義的scheduler情況下,mk-scheduler函數默認返回DefaultScheduler,mk-scheduler函數參見其定義部分
    :scheduler ( mk-scheduler conf inimbus)
    }))

mk-scheduler函數定義如下:

mk-scheduler函數

 

( defn mk-scheduler [ conf inimbus ]
  ;; 當前版本getForcedScheduler函數返回nil
 ( let [ forced-scheduler ( .getForcedScheduler inimbus)
        ;; scheduler綁定IScheduler接口的實現
    ;; cond等價於java中的switch,我們可以發現首先檢查forced-scheduler,如果forced-scheduler為nil,則檢查是否有用戶自定義的scheduler,如果沒有則
    ;; 使用默認的DefaultScheduler
        scheduler ( cond
                    forced-scheduler
                   ( do ( log-message "Using forced scheduler from INimbus " ( class forced-scheduler))
                        forced-scheduler)
   
                   ( conf STORM-SCHEDULER)
                   ( do ( log-message "Using custom scheduler: " ( conf STORM-SCHEDULER))
                       ( -> ( conf STORM-SCHEDULER) new-instance))
   
                    :else
                   ( do ( log-message "Using default scheduler")
                       ( DefaultScheduler. )))]
    ;; 先調用prepare函數
   ( .prepare scheduler conf)
    ;; 然后返回scheduler
    scheduler
   ))

cleanup-corrupt-topologies!函數定義如下:

cleanup-corrupt-topologies!函數

 

( defn cleanup-corrupt-topologies! [ nimbus ]
  ;; 獲取nimbus這個map中保存的StormCluterState實例
 ( let [ storm-cluster-state ( :storm-cluster-state nimbus)
        ;; code-ids綁定了nimbus服務器上{storm.local.dir}/nimbus/stormdist/目錄下所有子目錄的名稱,即提交給nimbus的所有topology的id
        code-ids ( set ( code-ids ( :conf nimbus)))
    ;; active-topologies綁定zookeeper上/storms/目錄中所有文件名稱,即當前storm集群上正在運行的topology的id
        active-topologies ( set ( .active-storms storm-cluster-state))
    ;; corrupt-topologies綁定active-topologies和code-ids的差集,即當前正在運行的,但丟失jar包、topology信息和配置信息的topology的id
        corrupt-topologies ( set/difference active-topologies code-ids )]
    ;; 將id包含在corrupt-topologies集合的topology的分配信息從zookeeper的/assignments目錄刪除,同時將StormBase信息從zookeeper的/storms目錄刪除
   ( doseq [ corrupt corrupt-topologies ]
     ( log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...")
     ( .remove-storm! storm-cluster-state corrupt)
     )))

transition!函數定義如下:

transition!函數的作用十分重要,負責topology狀態轉換,在啟動nimbus場景下,event的值為":startup"關鍵字,error-on-no-transition?的值為false。transition!函數有兩個重載版本。

transition!函數

 

( defn transition!
  ([ nimbus storm-id event ]
    ( transition! nimbus storm-id event false))
  ([ nimbus storm-id event error-on-no-transition? ]
    ;; 加鎖
    ( locking ( :submit-lock nimbus)
      ;; system-events綁定一個集合#{:startup}
      ( let [ system-events # { :startup }
            ;; 在啟動nimbus場景下,event綁定[:startup],event-args為nil
            [ event & event-args ] ( if ( keyword? event) [ event ] event)
            ;; 從zookeeper上獲取topology的狀態,一個map對象,綁定到status上
            status ( topology-status nimbus storm-id )]
        ;; handles the case where event was scheduled but topology has been removed
        ( if-not status
          ;; 如果status為nil則記錄日志,transition!函數執行結束
          ( log-message "Cannot apply event " event " to " storm-id " because topology no longer exists")
          ;; 如果status不為nil,get-event綁定一個函數
          ( let [ get-event ( fn [ m e ]
                            ( if ( contains? m e)
                              ( m e)
                              ( let [ msg ( str "No transition for event: " event
                                              ", status: " status,
                                              " storm-id: " storm-id )]
                                ( if error-on-no-transition?
                                  ( throw-runtime msg)
                                  ( do ( when-not ( contains? system-events event)
                                        ( log-message msg))
                                      nil))
                                )))
                ;; state-transitions函數返回一個狀態轉換映射map,這個map中規定了由一種狀態可以轉換到哪些狀態,並且在狀態轉換后執行哪些處理(即調用哪個函數),參見其定義部分
                ;; 通過分析state-transitions函數,我們可以發現只有當topology的當前狀態為":killed"和":rebalancing"時,才允許轉換到":startup"狀態,如果當前狀態是其他狀態,transition將為nil
                ;; 我們先討論其他狀態,這時transition為nil,接着transition通過if判斷將綁定一個(fn [] nil)函數,這樣new-status將為nil。所以在啟動nimbus場景下,topology由其他狀態轉換到":startup"狀態時,transition!函數什么都沒做
                transition ( -> ( state-transitions nimbus storm-id status)
                               ( get ( :type status))
                               ( get-event event))
                transition ( if ( or ( nil? transition)
                                   ( keyword? transition))
                             ( fn [] transition)
                              transition)
                new-status ( apply transition event-args)
                new-status ( if ( keyword? new-status)
                              { :type new-status }
                              new-status )]
            ( when new-status
              ( set-topology-status! nimbus storm-id new-status)))))
      )))

1、如果topology由":killed"轉換到":startup"(kill topology的過程中,nimbus掛掉了,當重啟nimbus時就有可能出現這種狀態轉換)時,transition將綁定

( fn [] ( delay-event nimbus
                    storm-id
                   ( :kill-time-secs status)
                    :remove)
    nil)

new-status值為transition綁定的函數的返回值nil。transition綁定的函數通過調用delay-event函數將#(transition! nimbus storm-id :remove false)函數添加到storm定時器中,然后由storm定時器執行該函數,該函數再次調用了transition!函數,不過這次是由":killed"轉換到":remove", 調用函數

( fn []
   ( log-message "Killing topology: " storm-id)
    ;; 刪除zookeeper上該topology的StormBase信息和分配信息
   ( .remove-storm! ( :storm-cluster-state nimbus)
                    storm-id)
    nil)

2、如果topology由":rebalancing"轉換到":startup"(rebalance topology的過程中,nimbus掛掉了,當重啟nimbus時就有可能出現這種狀態轉換)時,transition將綁定

( fn [] ( delay-event nimbus
                storm-id
               ( :delay-secs status)
                :do-rebalance)
    nil)

new-status值為transition綁定的函數的返回值nil。transition綁定的函數通過調用delay-event函數將#(transition! nimbus storm-id :do-rebalance false)函數添加到storm定時器中,然后由storm定時器執行該函數,該函數再次調用了transition!函數,不過這次是由":rebalancing"轉換到":do-rebalance",調用函數

( fn []
  ( do-rebalance nimbus storm-id status)
  ( :old-status status))

由於這個函數返回:rebalancing狀態的前一個狀態,所以storm定時器所執行的定時任務會將topology的狀態由:rebalancing修改成前一個狀態。以上就是啟動nimbus場景下,topology可能的狀態轉換處理過程。 delay-event函數定義如下:主要功能就是將#(transition! nimbus storm-id event false)函數作為"定時任務"添加到storm定時器中。

( defn delay-event [ nimbus storm-id delay-secs event ]
 ( log-message "Delaying event " event " for " delay-secs " secs for " storm-id)
 ( schedule ( :timer nimbus)
            delay-secs
            #( transition! nimbus storm-id event false)
           ))

state-transitions函數定義如下:

state-transitions函數

 

( defn state-transitions [ nimbus storm-id status ]
  { :active { :inactivate :inactive            
            :activate nil
            :rebalance ( rebalance-transition nimbus storm-id status)
            :kill ( kill-transition nimbus storm-id)
            }
  :inactive { :activate :active
              :inactivate nil
              :rebalance ( rebalance-transition nimbus storm-id status)
              :kill ( kill-transition nimbus storm-id)
              }
  :killed { :startup ( fn [] ( delay-event nimbus
                                        storm-id
                                        ( :kill-time-secs status)
                                        :remove)
                            nil)
            :kill ( kill-transition nimbus storm-id)
            :remove ( fn []
                     ( log-message "Killing topology: " storm-id)
                     ( .remove-storm! ( :storm-cluster-state nimbus)
                                      storm-id)
                      nil)
            }
  :rebalancing { :startup ( fn [] ( delay-event nimbus
                                              storm-id
                                             ( :delay-secs status)
                                              :do-rebalance)
                                nil)
                :kill ( kill-transition nimbus storm-id)
                :do-rebalance ( fn []
                                ( do-rebalance nimbus storm-id status)
                                ( :old-status status))
                }})

do-cleanup函數定義如下:

do-cleanup函數

 

( defn do-cleanup [ nimbus ]
 ( let [ storm-cluster-state ( :storm-cluster-state nimbus)
        conf ( :conf nimbus)
        submit-lock ( :submit-lock nimbus )]
    ;; to-cleanup-ids綁定需要清理的topology的id,即不再活躍的topology的id,cleanup-storm-ids函數參見其定義部分
   ( let [ to-cleanup-ids ( locking submit-lock
                          ( cleanup-storm-ids conf storm-cluster-state ))]
     ( when-not ( empty? to-cleanup-ids)
       ( doseq [ id to-cleanup-ids ]
         ( log-message "Cleaning up " id)
          ;; 從zookeeper上刪除/workerbeats/{id}節點(清理其心跳信息)
         ( .teardown-heartbeats! storm-cluster-state id)
          ;; 從zookeeper上刪除/errors/{id}節點(清理其錯誤信息)
         ( .teardown-topology-errors! storm-cluster-state id)
          ;; 從nimbus服務器上刪除{storm.local.dir}/nimbus/stormdist/{id}目錄(刪除其jar包,topology信息,配置信息)
         ( rmr ( master-stormdist-root conf id))
          ;; 將該topology的心跳信息從nimbus的心跳緩存中刪除
         ( swap! ( :heartbeats-cache nimbus) dissoc id))
       ))))

cleanup-storm-ids函數定義如下:

cleanup-storm-ids函數

 

( defn cleanup-storm-ids [ conf storm-cluster-state ]
  ;; heartbeat-ids綁定有心跳的topology的id集合
 ( let [ heartbeat-ids ( set ( .heartbeat-storms storm-cluster-state))
        ;; error-ids綁定有錯誤信息的topology的id集合
        error-ids ( set ( .error-topologies storm-cluster-state))
        ;; code-ids綁定在nimbus服務器上有jar包的topology的id集合
        code-ids ( code-ids conf)
        ;; assigned-ids綁定當前活躍的topology的id集合
        assigned-ids ( set ( .active-storms storm-cluster-state ))]
    ;; heartbeat-ids、error-ids、code-ids的並集再與assigned-ids做差集就是不活躍的topology的id
   ( set/difference ( set/union heartbeat-ids error-ids code-ids) assigned-ids)
   ))

以上就是nimbus啟動源碼分析。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM