Storm-源碼分析-Topology Submit-Client


1 Storm Client

最開始使用storm命令來啟動topology, 如下

storm jar storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.WordCountTopology

這個storm命令是用python實現的, 看看其中的jar函數, 很簡單, 調用exec_storm_class, 其中jvmtype="-client"
而exec_storm_class其實就是拼出一條java執行命令, 然后用os.system(command)去執行, 為何用Python寫, 簡單? 可以直接使用storm命令?
這兒的klass就是topology類, 所以java命令只是調用Topology類的main函數

def jar(jarfile, klass, *args):
    """Syntax: [storm jar topology-jar-path class ...]

Runs the main method of class with the specified arguments.
The storm jars and configs in ~/.storm are put on the classpath.
The process is configured so that StormSubmitter
(http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
will upload the jar at topology-jar-path when the topology is submitted.
"""
    exec_storm_class(
        klass,
        jvmtype="-client",
        extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],
        args=args,
        childopts="-Dstorm.jar=" + jarfile)

def exec_storm_class(klass, jvmtype="-server", childopts="", extrajars=[], args=[]):
    nativepath = confvalue("java.library.path", extrajars)
    args_str = " ".join(map(lambda s: "\"" + s + "\"", args))
    command = "java" + jvmtype + " -Dstorm.home=" + STORM_DIR + " " + get_config_opts() + " -Djava.library.path=" + nativepath + " " + childopts + " -cp" + get_classpath(extrajars) + " " + klass + " " + args_str
    print "Running:" + command
    os.system(command)

直接看看WordCountTopology例子的main函數都執行什么?

除了定義topology, 最終會調用StormSubmitter.submitTopology(args[0], conf, builder.createTopology()), 來提交topology

    public static void main(String[] args) throws Exception {        
        TopologyBuilder builder = new TopologyBuilder();        
        builder.setSpout("spout", new RandomSentenceSpout(), 5);        
        builder.setBolt("split", new SplitSentence(), 8)
                 .shuffleGrouping("spout");
        builder.setBolt("count", new WordCount(), 12)
                 .fieldsGrouping("split", new Fields("word"));

        Config conf = new Config();
        conf.setDebug(true);
      
        if(args!=null && args.length > 0) {
            conf.setNumWorkers(3);            
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {        
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());   
            Thread.sleep(10000);
            cluster.shutdown();
        }
    }

StormSubmitter

直接看看submitTopology,
1. 配置參數
   把命令行參數放在stormConf, 從conf/storm.yaml讀取配置參數到conf, 再把stormConf也put到conf, 可見命令行參數的優先級更高
   將stormConf轉化為Json, 因為這個配置是要發送到服務器的

2. Submit Jar
    StormSubmitter的本質是個Thrift Client, 而Nimbus則是Thrift Server, 所以所有的操作都是通過Thrift RPC來完成, Thrift參考Thrift, Storm-源碼分析- Thrift的使用
    先判斷topologyNameExists, 通過Thrift client得到現在運行的topology的狀況, 並check
    然后Submit Jar, 通過底下三步         
    client.getClient().beginFileUpload();
    client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
    client.getClient().finishFileUpload(uploadLocation);
    把數據通過RPC發過去, 具體怎么存是nimbus自己的邏輯的事...

3. Submit Topology
    很簡單只是簡單的調用RPC
    client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);

    /**
     * Submits a topology to run on the cluster. A topology runs forever or until 
     * explicitly killed.
     *
     *
     * @param name the name of the storm.
     * @param stormConf the topology-specific configuration. See {@link Config}. 
     * @param topology the processing to execute.
     * @param options to manipulate the starting of the topology
     * @throws AlreadyAliveException if a topology with this name is already running
     * @throws InvalidTopologyException if an invalid topology was submitted
     */
    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
        if(!Utils.isValidConf(stormConf)) {
            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
        }
        stormConf = new HashMap(stormConf);
        stormConf.putAll(Utils.readCommandLineOpts());
        Map conf = Utils.readStormConfig();
        conf.putAll(stormConf);
        try {
            String serConf = JSONValue.toJSONString(stormConf);
            if(localNimbus!=null) {
                LOG.info("Submitting topology " + name + " in local mode");
                localNimbus.submitTopology(name, null, serConf, topology);
            } else {
                NimbusClient client = NimbusClient.getConfiguredClient(conf);
                if(topologyNameExists(conf, name)) {
                    throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                }
                submitJar(conf);
                try {
                    LOG.info("Submitting topology " +  name + " in distributed mode with conf " + serConf);
                    if(opts!=null) {
                        client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);                    
                    } else {
                        // this is for backwards compatibility
                        client.getClient().submitTopology(name, submittedJar, serConf, topology);                                            
                    }
                } catch(InvalidTopologyException e) {
                    LOG.warn("Topology submission exception", e);
                    throw e;
                } catch(AlreadyAliveException e) {
                    LOG.warn("Topology already alive exception", e);
                    throw e;
                } finally {
                    client.close();
                }
            }
            LOG.info("Finished submitting topology: " +  name);
        } catch(TException e) {
            throw new RuntimeException(e);
        }
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM