storm源碼分析之topology提交過程


storm集群上運行的是一個個topology,一個topology是spouts和bolts組成的圖。當我們開發完topology程序后將其打成jar包,然后在shell中執行storm jar xxxxxx.jar xxxxxxxClass就可以將jar包上傳到storm集群的nimbus上,並執行topology。本文主要分析下topology的jar包是如何上傳到nimbus上的。首先我們從storm的jar命令入手,jar命令的實現位於storm根目錄的bin/storm文件里。定義如下:

def jar( jarfile , klass , * args ):
    """Syntax: [storm jar topology-jar-path class ...]

   Runs the main method of class with the specified arguments.
   The storm jars and configs in ~/.storm are put on the classpath.
   The process is configured so that StormSubmitter
   (http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
   will upload the jar at topology-jar-path when the topology is submitted.
   """
    exec_storm_class(
        klass ,
        jvmtype = "-client" ,
        extrajars = [ jarfile , USER_CONF_DIR , STORM_DIR + "/bin" ],
        args = args ,
        jvmopts = [ ' ' . join( filter( None , [ JAR_JVM_OPTS , "-Dstorm.jar=" + jarfile ]))])

jar命令是由python實現的,很奇怪為什么不用clojure實現呢?(不得而知)。jarfile表示jar包的位置;klass表示topology的入口,也就是有main函數的類;*args表示傳遞給main函數的參數。jvmtype="-client"表示指定jvm類型為client類型(jvm有兩種類型client和server,服務器端默認為server類型);extrajars集合用於存放編譯topology的jar包時,所有依賴jar包的路徑;jvmopts集合存放以jvm參數,這里比較重要的是-Dstorm.jar參數,這個參數的值是jarfile,這樣在運行submitTopology方法時就可以通過storm.jar參數獲得jar包的路徑了(通過jvm參數進行方法參數傳遞)exec_storm_class函數的邏輯比較簡單,具體實現如下:

def exec_storm_class( klass , jvmtype = "-server" , jvmopts = [], extrajars = [], args = [], fork = False ):
    global CONFFILE
    all_args = [
        "java" , jvmtype , get_config_opts (),
        "-Dstorm.home=" + STORM_DIR ,
        "-Djava.library.path=" + confvalue( "java.library.path" , extrajars ),
        "-Dstorm.conf.file=" + CONFFILE ,
        "-cp" , get_classpath( extrajars ),
    ] + jvmopts + [ klass ] + list( args)
    print "Running: " + " " . join( all_args)
    if fork :
        os . spawnvp( os . P_WAIT , "java" , all_args)
    else :
        os . execvp( "java" , all_args) # replaces the current process and never returns

get_config_opts()獲取jvm的默認配置信息,confvalue("java.library.path", extrajars)獲取storm使用的本地庫JZMQ加載路徑,get_classpath(extrajars)獲取所有依賴jar包的完整路徑,然后拼接一個java -cp命令運行topology的main方法。接下來程序執行流程轉移到topology的main方法內,我們以storm-starter項目中的wordCountTopology的main方法為例:

public static void main( String [] args) throws Exception {

    TopologyBuilder builder = new TopologyBuilder();

    builder . setSpout( "spout" , new RandomSentenceSpout (), 6);

    builder . setBolt( "split" , new SplitSentence (), 12 ). shuffleGrouping( "spout");
    builder . setBolt( "count" , new WordCount (), 10 ). fieldsGrouping( "split" , new Fields( "word"));

    Config conf = new Config();
    conf . setDebug( true);


    if ( args != null && args . length > 0) {
      conf . setNumWorkers( 4);

      StormSubmitter . submitTopology( args [ 0 ], conf , builder . createTopology());
    }
    else {
      conf . setMaxTaskParallelism( 3);

      LocalCluster cluster = new LocalCluster();
      cluster . submitTopology( "word-count" , conf , builder . createTopology());

      Thread . sleep( 10000);

      cluster . shutdown();
    }
  }

main方法構建topology后,調用StormSubmitter類的submitTopology方法提交topology。submitTopology方法如下:

/**
    * Submits a topology to run on the cluster. A topology runs forever or until
    * explicitly killed.
    *
    *
    * @param name the name of the storm.
    * @param stormConf the topology-specific configuration. See {@link Config}.
    * @param topology the processing to execute.
    * @throws AlreadyAliveException if a topology with this name is already running
    * @throws InvalidTopologyException if an invalid topology was submitted
    */
        public static void submitTopology( String name , Map stormConf , StormTopology topology)
            throws AlreadyAliveException , InvalidTopologyException {
                submitTopology( name , stormConf , topology , null);
            }
   
    /**
    * Submits a topology to run on the cluster. A topology runs forever or until
    * explicitly killed.
    *
    *
    * @param name the name of the storm.
    * @param stormConf the topology-specific configuration. See {@link Config}.
    * @param topology the processing to execute.
    * @param options to manipulate the starting of the topology
    * @throws AlreadyAliveException if a topology with this name is already running
    * @throws InvalidTopologyException if an invalid topology was submitted
    */
    public static void submitTopology( String name , Map stormConf , StormTopology topology , SubmitOptions opts)
        throws AlreadyAliveException , InvalidTopologyException {
        if (! Utils . isValidConf( stormConf)) {
            throw new IllegalArgumentException( "Storm conf is not valid. Must be json-serializable");
        }
        stormConf = new HashMap( stormConf);
        stormConf . putAll( Utils . readCommandLineOpts());
        Map conf = Utils . readStormConfig();
        conf . putAll( stormConf);
        try {
            String serConf = JSONValue . toJSONString( stormConf);
            if( localNimbus != null) {
                LOG . info( "Submitting topology " + name + " in local mode");
                localNimbus . submitTopology( name , null , serConf , topology);
            } else {
                NimbusClient client = NimbusClient . getConfiguredClient( conf);
                if( topologyNameExists( conf , name)) {
                    throw new RuntimeException( "Topology with name `" + name + "` already exists on cluster");
                }
                submitJar( conf);
                try {
                    LOG . info( "Submitting topology " +   name + " in distributed mode with conf " + serConf);
                    if( opts != null) {
                        client . getClient (). submitTopologyWithOpts( name , submittedJar , serConf , topology , opts);                    
                    } else {
                        // this is for backwards compatibility
                        client . getClient (). submitTopology( name , submittedJar , serConf , topology);                                            
                    }
                } catch( InvalidTopologyException e) {
                    LOG . warn( "Topology submission exception" , e);
                    throw e;
                } catch( AlreadyAliveException e) {
                    LOG . warn( "Topology already alive exception" , e);
                    throw e;
                } finally {
                    client . close();
                }
            }
            LOG . info( "Finished submitting topology: " +   name);
        } catch( TException e) {
            throw new RuntimeException( e);
        }
    }

submitTopology方法主要完成三件工作:

1. 配置參數
把命令行參數放在stormConf, 從conf/storm.yaml讀取配置參數到conf, 再把stormConf也put到conf, 可見命令行參數的優先級更高,將stormConf轉化為Json, 因為這個配置是要發送到服務器的

2. 調用submitJar方法

submitJar( conf)
        private static void submitJar( Map conf) {
        if( submittedJar == null) {
            LOG . info( "Jar not uploaded to master yet. Submitting jar...");
            String localJar = System . getProperty( "storm.jar");
            submittedJar = submitJar( conf , localJar);
        } else {
            LOG . info( "Jar already uploaded to master. Not submitting jar.");
        }
    }

System.getProperty("storm.jar")獲取jvm參數storm.jar的值,即topology jar包的路徑,然后調用重載方法submitJar。

public static String submitJar( Map conf , String localJar) {
        if( localJar == null) {
            throw new RuntimeException( "Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
        }
        NimbusClient client = NimbusClient . getConfiguredClient( conf);
        try {
            String uploadLocation = client . getClient (). beginFileUpload();
            LOG . info( "Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
            BufferFileInputStream is = new BufferFileInputStream( localJar);
            while( true) {
                byte [] toSubmit = is . read();
                if( toSubmit . length == 0) break;
                client . getClient (). uploadChunk( uploadLocation , ByteBuffer . wrap( toSubmit));
            }
            client . getClient (). finishFileUpload( uploadLocation);
            LOG . info( "Successfully uploaded topology jar to assigned location: " + uploadLocation);
            return uploadLocation;
        } catch( Exception e) {
            throw new RuntimeException( e);            
        } finally {
            client . close();
        }
    }

StormSubmitter的本質是個Thrift Client,而Nimbus則是Thrift Server,所以所有的操作都是通過Thrift RPC來完成,submitJar首先創建client,然后調用nimbus thrift server的beginFileUpload()方法獲取nimbus存放jar的目錄。beginFileUpload函數如下:

( beginFileUpload [ this ]
       ( let [ fileloc ( str ( inbox nimbus) "/stormjar-" ( uuid) ".jar" )]
         ( .put ( :uploaders nimbus)
                fileloc
               ( Channels/newChannel ( FileOutputStream. fileloc)))
         ( log-message "Uploading file from client to " fileloc)
          fileloc
    ))

(inbox nimbus)函數里面又調用了master-inbox函數,master-inbox主要創建storm.local.dir的值/inbox目錄,並返回完整目錄名,所以topology jar包的將會通過uploadChunk方法上傳到nimbus上的storm.local.dir的值/inbox/stormjar-32位uuid.jar。

3. 生成thrift client並調用nimbus thrift server的submitTopologyWithOpts或submitTopology方法(submitTopologyWithOpts或submitTopology方法定義在Nimbus.clj中),submitTopologyWithOpts如下:

( ^ void submitTopologyWithOpts
        [ this ^ String storm-name ^ String uploadedJarLocation ^ String serializedConf ^ StormTopology topology
        ^ SubmitOptions submitOptions ]
       ( try
         ( assert ( not-nil? submitOptions))
         ( validate-topology-name! storm-name)
         ( check-storm-active! nimbus storm-name false)
         ( let [ topo-conf ( from-json serializedConf )]
           ( try
             ( validate-configs-with-schemas topo-conf)
             ( catch IllegalArgumentException ex
               ( throw ( InvalidTopologyException. ( .getMessage ex)))))
           ( .validate ^ backtype.storm.nimbus.ITopologyValidator ( :validator nimbus)
                      storm-name
                      topo-conf
                      topology))
         ( swap! ( :submitted-count nimbus) inc)
         ( let [ storm-id ( str storm-name "-" @( :submitted-count nimbus) "-" ( current-time-secs))
                storm-conf ( normalize-conf
                            conf
                           ( -> serializedConf
                                from-json
                               ( assoc STORM-ID storm-id)
                             ( assoc TOPOLOGY-NAME storm-name))
                            topology)
                total-storm-conf ( merge conf storm-conf)
                topology ( normalize-topology total-storm-conf topology)
                storm-cluster-state ( :storm-cluster-state nimbus )]
           ( system-topology! total-storm-conf topology) ;; this validates the structure of the topology
           ( log-message "Received topology submission for " storm-name " with conf " storm-conf)
            ;; lock protects against multiple topologies being submitted at once and
            ;; cleanup thread killing topology in b/w assignment and starting the topology
           ( locking ( :submit-lock nimbus)
             ( setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
             ( .setup-heartbeats! storm-cluster-state storm-id)
             ( let [ thrift-status->kw-status { TopologyInitialStatus/INACTIVE :inactive
                                              TopologyInitialStatus/ACTIVE :active }]
               ( start-storm nimbus storm-name storm-id ( thrift-status->kw-status ( .get_initial_status submitOptions))))
             ( mk-assignments nimbus)))
         ( catch Throwable e
           ( log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
           ( throw e))))

storm-name表示topology的名字,uploadedJarLocation表示jar包在nimbus上的位置,serializedConf表示topology的序列化的配置信息,topology參數表示thrift結構的topology,topology結構定義在storm.thrift中,如下:

struct StormTopology {
  //ids must be unique across maps
  // # workers to use is in conf
  1 : required map<string, SpoutSpec> spouts;
  2 : required map<string, Bolt> bolts;
  3 : required map<string, StateSpoutSpec> state_spouts;
}

spouts存放spout id和spout的鍵值對,bolts存放bolt id和bolt的鍵值對,StateSpoutSpec暫未實現。SpoutSpec定義如下:

struct SpoutSpec {
  1 : required ComponentObject spout_object;
  2 : required ComponentCommon common;
  // can force a spout to be non-distributed by overriding the component configuration
  // and setting TOPOLOGY_MAX_TASK_PARALLELISM to 1
}

Bolt定義如下:

struct Bolt {
  1 : required ComponentObject bolt_object;
  2 : required ComponentCommon common;
}

Bolt和Spout的結構相同,都是由1個ComponentObject結構和1個ComponentCommon結構組成。ComponentObject定義如下:

union ComponentObject {
  1 : binary serialized_java;
  2 : ShellComponent shell;
  3 : JavaObject java_object;
}

ComponentObject即是bolt的實現實體,它可以是以下三個類型之一:

1、1個序列化的java對象(這個對象實現IBolt接口)
2、1個ShellComponent對象,意味着bolt是由其他語言實現的。如果以這種方式來定義1個bolt,Storm將會實例化1個ShellBolt對象來
     負責處理基於JVM的worker進程與非JVM的component(即該bolt)實現體之間的通訊。
3、1個JavaObject結構,這個結構告訴Storm實例化這個bolt所需要的classname和構造函數參數。這一點在你想用非JVM語言來定義topology時比較有用。這樣,在你使用非JVM語言來定義topology時就可以做到既使用基於     JVM的spout或bolt,同時又不需要創建並序列化它們的Java對象。

ComponentCommon定義如下:

struct ComponentCommon {
  1 : required map<GlobalStreamId, Grouping> inputs;
  2 : required map<string, StreamInfo> streams ; //key is stream id
  3 : optional i32 parallelism_hint ; //how many threads across the cluster should be dedicated to this component

  // component specific configuration respects :
  // topology.debug : false
  // topology.max.task.parallelism : null // can replace isDistributed with this
  // topology.max.spout.pending : null
  // topology.kryo.register // this is the only additive one
 
  // component specific configuration
  4 : optional string json_conf;
}

GlobalStreamId定義如下:

struct GlobalStreamId {
  1 : required string componentId;
  2 : required string streamId;
  # Going to need to add an enum for the stream type ( NORMAL or FAILURE)
}

ComponentCommon定義了這個component的其他所有屬性。包括:

1、這個component接收什么stream(被定義在1個component_id到stream_id的map里,在stream做分組時用到)
2、這個component發射什么stream以及stream的元數據(是否是direct stream,stream中field的聲明)
3、這個component的並行度
4、這個component的配置項configuration

(assert (not-nil? submitOptions))如果submitOptions為nil,那么assert將會拋出java.lang.AssertionError,(validate-topology-name! storm-name)驗證topology的名字,validate-topology-name!定義如下:

( defn validate-topology-name! [ name ]
 ( if ( some #( .contains name %) DISALLOWED-TOPOLOGY-NAME-STRS)
   ( throw ( InvalidTopologyException.
           ( str "Topology name cannot contain any of the following: " ( pr-str DISALLOWED-TOPOLOGY-NAME-STRS))))
 ( if ( clojure.string/blank? name)
   ( throw ( InvalidTopologyException.
           ( "Topology name cannot be blank"))))))

DISALLOWED-TOPOLOGY-NAME-STRS定義如下:

( def DISALLOWED-TOPOLOGY-NAME-STRS # { "/" "." ":" "\\" })

包含了不允許出現在topology名字中的特殊字符,some函數的第一個參數是一個匿名函數,對DISALLOWED-TOPOLOGY-NAME-STRS集合中的每個元素應用該匿名函數,遇到第一個true則返回true。validate-topology-name!函數主要檢查topology的名字中是否包含"非法字符"。check-storm-active!函數用於檢查該topology的狀態是否是"active"。定義如下:

( defn check-storm-active! [ nimbus storm-name active? ]
 ( if ( = ( not active?)
        ( storm-active? ( :storm-cluster-state nimbus)
                        storm-name))
   ( if active?
     ( throw ( NotAliveException. ( str storm-name " is not alive")))
     ( throw ( AlreadyAliveException. ( str storm-name " is already active"))))
   ))

nimbus是一個保存了nimbus thrift server當前狀態的map,這個map是由nimbus-data函數生成的,nimbus-data函數如下:

( defn nimbus-data [ conf inimbus ]
 ( let [ forced-scheduler ( .getForcedScheduler inimbus )]
    { :conf conf
    :inimbus inimbus
    :submitted-count ( atom 0)
    :storm-cluster-state ( cluster/mk-storm-cluster-state conf)
    :submit-lock ( Object.)
    :heartbeats-cache ( atom {})
    :downloaders ( file-cache-map conf)
    :uploaders ( file-cache-map conf)
    :uptime ( uptime-computer)
    :validator ( new-instance ( conf NIMBUS-TOPOLOGY-VALIDATOR))
    :timer ( mk-timer :kill-fn ( fn [ t ]
                                ( log-error t "Error when processing event")
                                ( exit-process! 20 "Error when processing an event")
                                ))
    :scheduler ( mk-scheduler conf inimbus)
    }))

conf保存了storm集群的配置信息,inimbus表示當前nimbus實例,cluster/mk-storm-cluster-state返回一個實現了StormClusterState協議的實例。storm-active?函數定義如下:

( defn storm-active? [ storm-cluster-state storm-name ]
 ( not-nil? ( get-storm-id storm-cluster-state storm-name)))

通過調用get-storm-id函數獲取指定topology名字的topology id,如果id存在則返回true,否則返回false。get-storm-id函數如下:

( defn get-storm-id [ storm-cluster-state storm-name ]
 ( let [ active-storms ( .active-storms storm-cluster-state )]
   ( find-first
      #( = storm-name ( :storm-name ( .storm-base storm-cluster-state % nil)))
      active-storms)
   ))

active-storms函數獲取zookeeper中/storms/的所有children,/storms/{topology-id}中存放當前正在運行的topology信息。保存的內容參考common.clj中的類StormBase。

( defrecord StormBase [ storm-name launch-time-secs status num-workers component->executors ])

find-first函數返回名字等於storm-name的第一個topology的id。當我們正確提交topology時,由於zookeeper中的/storms中不存在與之對應的{topology-id}文件,所以check-storm-active!函數的第一個if的條件表達式為(= true true)。進而通過check-storm-active!函數的檢查。將topology的配置信息綁定到topo-conf,validate-configs-with-schemas函數驗證配置信息的正確性,validate-configs-with-schemas定義如下:

( defn validate-configs-with-schemas
  [ conf ]
 ( doseq [[ k v ] conf
          :let [ schema ( CONFIG-SCHEMA-MAP k )]]
   ( if ( not ( nil? schema))
     ( .validateField schema k v))))

CONFIG-SCHEMA-MAP定義如下:

;; Create a mapping of config-string -> validator
;; Config fields must have a _SCHEMA field defined
( def CONFIG-SCHEMA-MAP
 ( ->> ( .getFields Config)
      ( filter #( not ( re-matches # ".*_SCHEMA$" ( .getName %))))
      ( map ( fn [ f ] [( .get f nil)
                    ( get-FieldValidator
                      ( -> Config
                          ( .getField ( str ( .getName f) "_SCHEMA"))
                          ( .get nil )))]))
      ( into {})))

Config.java中主要有兩類靜態變量:一類是配置信息,一類是配置信息對應的校驗器,校驗器屬性以_SCHEMA結尾。CONFIG-SCHEMA-MAP中存放了配置信息變量名和對應校驗器的鍵值對config-string -> validator。
validate-configs-with-schemas函數就是根據配置信息名獲取對應校驗器,然后對配置信息值進行校驗。相關校驗器請查看ConfigValidation類的內部類FieldValidator。(:validator nimbus)返回一個實現了backtype.storm.nimbus.ITopologyValidator接口的實例(backtype.storm.nimbus.DefaultTopologyValidators實例)並調用其validate方法。backtype.storm.nimbus.DefaultTopologyValidators類如下:

public class DefaultTopologyValidator implements ITopologyValidator {
    @Override
    public void prepare( Map StormConf ){
    }
    @Override
    public void validate( String topologyName , Map topologyConf , StormTopology topology) throws InvalidTopologyException {        
    }    
}

默認情況下validate方法是一個空實現。
swap!函數用於將atom(原子類型,與java中的原子類型相同)類型的(:submitted-count nimbus)加1,保存已提交topology的個數。storm-id綁定了topology的id。storm-conf綁定topology配置信息和集群配置信息合並后序列化器、需要序列化的類、acker的個數和最大任務並行度配置信息。total-storm-conf綁定全部配置信息。normalize-topology函數主要功能就是為topology添加"topology.tasks"(task總數)配置信息。

normalize-topology定義如下:

( defn normalize-topology [ storm-conf ^ StormTopology topology ]
 ( let [ ret ( .deepCopy topology )]
   ( doseq [[ _ component ] ( all-components ret )]
     ( .set_json_conf
       ( .get_common component)
       ( ->> { TOPOLOGY-TASKS ( component-parallelism storm-conf component )}
            ( merge ( component-conf component))
            to-json )))
    ret ))

ret綁定一個topology的深度復制,all-components函數返回該topology的所有組件的id和spout/bolt對象的鍵值對,然后通過調用get_common方法獲取spot/bolt對象的ComponentCommon屬性,->>是clojure中的一個宏,作用就是將{......}作為merge函數的最后一個參數,然后將merge函數的返回值作為to-json函數的最后一個參數,component-parallelism函數定義如下:

( defn- component-parallelism [ storm-conf component ]
 ( let [ storm-conf ( merge storm-conf ( component-conf component))
        num-tasks ( or ( storm-conf TOPOLOGY-TASKS) ( num-start-executors component))
        max-parallelism ( storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)
        ]
   ( if max-parallelism
     ( min max-parallelism num-tasks)
      num-tasks)))

component-parallelism是個私有函數,主要功能就是確定"topology.tasks"的值,num-start-executors函數獲取spout/bolt的並行度,沒有設置並行度時默認值為1,num-tasks綁定該topology的任務數,max-parallelism綁定最大任務數,最后num-tasks和max-parallelism中較小的。normalize-topology函數會將添加了"topology.tasks"的配置信息保存到spout/bolt的ComponentCommon屬性的json_conf中,並返回修改后的topology。
system-topology!函數定義如下:

( defn system-topology! [ storm-conf ^ StormTopology topology ]
 ( validate-basic! topology)
 ( let [ ret ( .deepCopy topology )]
   ( add-acker! storm-conf ret)
   ( add-metric-components! storm-conf ret)    
   ( add-system-components! storm-conf ret)
   ( add-metric-streams! ret)
   ( add-system-streams! ret)
   ( validate-structure! ret)
    ret
   ))

validate-basic!驗證topology的基本信息,add-acker!添加acker bolt,add-acker!函數定義如下:

( defn add-acker! [ storm-conf ^ StormTopology ret ]
 ( let [ num-executors ( if ( nil? ( storm-conf TOPOLOGY-ACKER-EXECUTORS)) ( storm-conf TOPOLOGY-WORKERS) ( storm-conf TOPOLOGY-ACKER-EXECUTORS))
        acker-bolt ( thrift/mk-bolt-spec* ( acker-inputs ret)
                                        ( new backtype.storm.daemon.acker)
                                        { ACKER-ACK-STREAM-ID ( thrift/direct-output-fields [ "id" ])
                                          ACKER-FAIL-STREAM-ID ( thrift/direct-output-fields [ "id" ])
                                          }
                                        :p num-executors
                                        :conf { TOPOLOGY-TASKS num-executors
                                                TOPOLOGY-TICK-TUPLE-FREQ-SECS ( storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS )})]
   ( dofor [[ _ bolt ] ( .get_bolts ret)
            :let [ common ( .get_common bolt )]]
          ( do
            ( .put_to_streams common ACKER-ACK-STREAM-ID ( thrift/output-fields [ "id" "ack-val" ]))
            ( .put_to_streams common ACKER-FAIL-STREAM-ID ( thrift/output-fields [ "id" ]))
            ))
   ( dofor [[ _ spout ] ( .get_spouts ret)
            :let [ common ( .get_common spout)
                  spout-conf ( merge
                              ( component-conf spout)
                              { TOPOLOGY-TICK-TUPLE-FREQ-SECS ( storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS )})]]
     ( do
        ;; this set up tick tuples to cause timeouts to be triggered
       ( .set_json_conf common ( to-json spout-conf))
       ( .put_to_streams common ACKER-INIT-STREAM-ID ( thrift/output-fields [ "id" "init-val" "spout-task" ]))
       ( .put_to_inputs common
                       ( GlobalStreamId. ACKER-COMPONENT-ID ACKER-ACK-STREAM-ID)
                       ( thrift/mk-direct-grouping))
       ( .put_to_inputs common
                       ( GlobalStreamId. ACKER-COMPONENT-ID ACKER-FAIL-STREAM-ID)
                       ( thrift/mk-direct-grouping))
       ))
   ( .put_to_bolts ret "__acker" acker-bolt)
   ))

根據是否配置"topology.acker.executors"獲取acker線程的個數,如果沒有配置num-executors綁定"topology.workers"的值,否則綁定"topology.acker.executors"的值。acker-bolt綁定生成的acker bolt對象。acker-inputs函數定義如下:

( defn acker-inputs [ ^ StormTopology topology ]
 ( let [ bolt-ids ( .. topology get_bolts keySet)
        spout-ids ( .. topology get_spouts keySet)
        spout-inputs ( apply merge
                           ( for [ id spout-ids ]
                              {[ id ACKER-INIT-STREAM-ID ] [ "id" ]}
                             ))
        bolt-inputs ( apply merge
                          ( for [ id bolt-ids ]
                            {[ id ACKER-ACK-STREAM-ID ] [ "id" ]
                              [ id ACKER-FAIL-STREAM-ID ] [ "id" ]}
                            ))]
   ( merge spout-inputs bolt-inputs)))

bolt-ids綁定topology所有bolt的id,spout-ids綁定所有spout的id,spout-inputs綁定來自spout的輸入流,bolt-inputs綁定來自bolt的輸入流,最后返回合並后的輸入流(一個map對象)。ACKER-ACK-STREAM-ID和ACKER-FAIL-STREAM-ID表示acker的輸出流。TOPOLOGY-TICK-TUPLE-FREQ-SECS表示tick tuple的頻率,初始值為消息超時的時間。第一個dofor語句為每個bolt添加ACKER-ACK-STREAM-ID和ACKER-FAIL-STREAM-ID輸出流用於將ack value發送個acker bolt,第二個dofor為每個spout設置了tick tuple的發送頻率,並且設置了發送給acker bolt的ACKER-INIT-STREAM-ID輸出流和來自ackerblot的兩個輸入流。這樣acker bolt就可以與spout和bolt進行ack信息通信了。add-metric-components!函數主要功能就是將metric bolts添加到topology定義中。metric bolt主要用於統計線程executor相關的信息。add-metric-components!函數定義如下:

( defn add-metric-components! [ storm-conf ^ StormTopology topology ]  
 ( doseq [[ comp-id bolt-spec ] ( metrics-consumer-bolt-specs storm-conf topology )]
   ( .put_to_bolts topology comp-id bolt-spec)))
metrics-consumer-bolt-specs 函數定義如下:
( defn metrics-consumer-bolt-specs [ storm-conf topology ]
 ( let [ component-ids-that-emit-metrics ( cons SYSTEM-COMPONENT-ID ( keys ( all-components topology)))
        inputs ( ->> ( for [ comp-id component-ids-that-emit-metrics ]
                      {[ comp-id METRICS-STREAM-ID ] :shuffle })
                   ( into {}))
       
        mk-bolt-spec ( fn [ class arg p ]
                      ( thrift/mk-bolt-spec*
                        inputs
                       ( backtype.storm.metric.MetricsConsumerBolt. class arg)
                        {} :p p :conf { TOPOLOGY-TASKS p }))]
   
   ( map
    ( fn [ component-id register ]          
      [ component-id ( mk-bolt-spec ( get register "class")
                                  ( get register "argument")
                                  ( or ( get register "parallelism.hint") 1 ))])
   
    ( metrics-consumer-register-ids storm-conf)
    ( get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))

component-ids-that-emit-metrics綁定包括system bolt在內的所有spout和bolt的id,inputs綁定了metric bolt的輸入流,並且使用shuffle grouping。mk-bolt-spec綁定一個匿名函數,metrics-consumer-register-ids函數為每個metric consumer對象產生一個component id列表,get函數返回所有metric consumer對象,map函數返回component id和metric consumer對象集合的列表([component-id metric-consumer] [component-id metric-consumer]......)。add-system-components!函數主要功能是將system bolt添加到topology定義中。system bolt用於統計與進程worker相關的信息,如內存使用率,gc情況,網絡吞吐量等。每個進程worker中只有一個system bolt。add-system-components!函數定義如下:

( defn add-system-components! [ conf ^ StormTopology topology ]
 ( let [ system-bolt-spec ( thrift/mk-bolt-spec*
                          {}
                         ( SystemBolt.)
                          { SYSTEM-TICK-STREAM-ID ( thrift/output-fields [ "rate_secs" ])
                          METRICS-TICK-STREAM-ID ( thrift/output-fields [ "interval" ])}                          
                          :p 0
                          :conf { TOPOLOGY-TASKS 0 })]
   ( .put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))

從thrift/mk-bolt-spec*函數的第一個參數{}我們可以發現system bolt沒有輸入流,從第三個參數可以發現它有兩個輸出流用於發送tick tuple,它的並行度為0,因為system bolt是與進程worker相關的,所以沒有必要指定並行度。同時他也不需要執行任何task。add-metric-streams!函數主要功能用於給topology添加metric streams定義,add-metric-streams!定義如下:

( defn add-metric-streams! [ ^ StormTopology topology ]
 ( doseq [[ _ component ] ( all-components topology)
          :let [ common ( .get_common component )]]
   ( .put_to_streams common METRICS-STREAM-ID
                    ( thrift/output-fields [ "task-info" "data-points" ]))))

給spout和bolt添加METRICS-STREAM-ID標示的metric stream。add-system-streams!函數與add-metric-streams!相似,給spout和bolt添加SYSTEM-STREAM-ID標示的system stream。submitTopologyWithOpts函數在調用system-topology!函數后,首先加鎖,然后調用setup-storm-code函數,該函數的主要功能就是將上傳給nimbus的jar包、topology和配置信息拷貝到{storm.local.dir}/nimbus/stormdist/{topology id}目錄中,定義如下:

( defn- setup-storm-code [ conf storm-id tmp-jar-location storm-conf topology ]
 ( let [ stormroot ( master-stormdist-root conf storm-id )]
  ( FileUtils/forceMkdir ( File. stormroot))
  ( FileUtils/cleanDirectory ( File. stormroot))
  ( setup-jar conf tmp-jar-location stormroot)
  ( FileUtils/writeByteArrayToFile ( File. ( master-stormcode-path stormroot)) ( Utils/serialize topology))
  ( FileUtils/writeByteArrayToFile ( File. ( master-stormconf-path stormroot)) ( Utils/serialize storm-conf))
  ))

setup-jar函數將{storm.local.dir}/nimbus/inbox/中的jar包拷貝到{storm.local.dir}/nimbus/stormdist/{topology id}目錄,並重命名為stormjar.jar。FileUtils/writeByteArrayToFile將topology對象和storm-conf序列化后分別保存到stormcode.ser和stormconf.ser。setup-heartbeats!函數定義在cluster.clj文件中,是StormClusterState協議的一個函數,主要功能就是在zookeeper上創建該topology用於存放心跳信息的目錄。心跳目錄:
/storm/workerbeats/{topology id}/。
start-storm函數的主要功能讀取整個集群的配置信息、nimbus的配置信息、從stormconf.ser反序列化topology配置信息和從stormcode.ser反序列化出topology,然后通過調用activate-storm!函數將topology的元數據StormBase對象寫入zookeeper的/storm/storms/{topology id}文件中。定義如下:

( defn- start-storm [ nimbus storm-name storm-id topology-initial-status ]
  { :pre [( # { :active :inactive } topology-initial-status )]}                
 ( let [ storm-cluster-state ( :storm-cluster-state nimbus)
        conf ( :conf nimbus)
        storm-conf ( read-storm-conf conf storm-id)
        topology ( system-topology! storm-conf ( read-storm-topology conf storm-id))
        num-executors ( ->> ( all-components topology) ( map-val num-start-executors ))]
   ( log-message "Activating " storm-name ": " storm-id)
   ( .activate-storm! storm-cluster-state
                      storm-id
                     ( StormBase. storm-name
                                 ( current-time-secs)
                                  { :type topology-initial-status }
                                 ( storm-conf TOPOLOGY-WORKERS)
                                  num-executors))))

submitTopologyWithOpts函數最后調用mk-assignments函數進行任務分配。任務分配是stom架構的重要組成部分。鑒於篇幅問題,有關任務分配的源碼分析會在之后的文章中講解。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM