Storm/JStorm之TopologyBuilder源碼閱讀


在Strom/JStorm中有一個類是特別重要的,主要用來構建Topology的,這個類就是TopologyBuilder. 
咱先看一下簡單的例子:

public static void main(String[] args) throws AlreadyAliveException,
            InvalidTopologyException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("input", new RandomSentenceSpout(), 2);
        builder.setBolt("bolt_sentence", new SplitSentenceBolt(), 2)
                .shuffleGrouping("input");

        // 本地模式:最主要用來調試用
        LocalCluster cluster = new LocalCluster();
        System.out.println("start wordcount");
        cluster.submitTopology("word count", conf,    builder.createTopology());
    }

在上面的main方法里先創建TopologyBuilder對象,然后設置好已創建的Spout節點和Bolt節點,並用隨機分組(shuffleGrouping)將Spout和Bolt節點連接起來形成Topology。

那TopologyBuilder是如何做的呢?請看下面TopologyBuilder源代碼:

/**
 * TopologyBuilder是一個用於構建Topology的工具類
 *
 */
public class TopologyBuilder {
    /**
     * 定義了類成員變量_bolts,用來存放IRichBolt類型的所有Bolt對象
     */
    private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
    /**
     * 定義了類成員變量_spouts,用來存放IRichSpout類型的所有Spout對象
     */
    private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
    /**
     * 定義了類成員變量_commons,存放了所有的Bolt和Spout對象
     */
    private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();

    // private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>();

    private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<String, StateSpoutSpec>();
    /**
     * 根據傳入的Bolt和Spout對象構建StormTopology對象
     * @return
     */
public StormTopology createTopology() {
        Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
        Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
        for (String boltId : _bolts.keySet()) {
            //根據boltId從_bolts中獲取到對應的bolt對象
            IRichBolt bolt = _bolts.get(boltId);
            //設置對應ComponentCommon對象的streams(輸出的字段列表以及是否是直接流)屬性值
            ComponentCommon common = getComponentCommon(boltId, bolt);
            /**
             * 先將Bolts對象序列化得到數組,再創建Bolt對象,所以所有在StormTopology中Bolts是對象序列化過后得到的字節數組.
             */
            boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
        }
        for (String spoutId : _spouts.keySet()) {
            //根據spoutId從_spouts中獲取到對應的spout對象
            IRichSpout spout = _spouts.get(spoutId);
            //設置對應ComponentCommon對象的streams(輸出的字段列表以及是否是直接流)
            ComponentCommon common = getComponentCommon(spoutId, spout);
            /**
             * 先將Spout對象序列化得到數組,再創建SpoutSpec對象,所以所有在StormTopology中Spouts是對象序列化過后得到的字節數組.
             */
            spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));

        }
        //將上述所設置的所有組件都封裝到StormTopology對象中,最后提交到集群中運行
        return new StormTopology(spoutSpecs, boltSpecs, new HashMap<String, StateSpoutSpec>());
    }
    /**
     * 下面幾個方法定義了setBolt方法以及它的重載方法
     */
    /**
     * 在這個topology中定義一個只有單線程並行度的新的bolt
     * 其它想要消耗這個bolt的輸出的組件會引用這個id
     */
   public BoltDeclarer setBolt(String id, IRichBolt bolt) {
        return setBolt(id, bolt, null);
    }

    /**
     *  為這個topology定義一個指定數量的並行度的bolt
     */
    public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {
        //檢測傳入的組件id是否唯一
        validateUnusedId(id);
        //生成common對象
        initCommon(id, bolt, parallelism_hint);
        _bolts.put(id, bolt);
        return new BoltGetter(id);
    }


    public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
        return setBolt(id, bolt, null);
    }


    public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) {
        /**
         * 該方法利用BasicBoltExecutor包裝(封裝)傳入的IBasicBolt對象
         * 在BasicBoltExecutor中實現了對消息的追蹤
         */
        return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
    }
  /**
     * 下面幾個方法定義了setSpout方法以及它的重載方法
     */
    public SpoutDeclarer setSpout(String id, IRichSpout spout) {
        return setSpout(id, spout, null);
    }

    public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {
        //檢測輸入的id是否唯一,若已經存在將拋出異常
        validateUnusedId(id);
        /**
         * 構建ComponentCommon對象並進行相對應的初始化,最后放入到_commons(在上述中已經定義)
         */
        initCommon(id, spout, parallelism_hint);
        _spouts.put(id, spout);
        return new SpoutGetter(id);
    }


    public SpoutDeclarer setSpout(String id, IControlSpout spout) {
        return setSpout(id, spout, null);
    }

    public SpoutDeclarer setSpout(String id, IControlSpout spout, Number parallelism_hint) {
        return setSpout(id, new ControlSpoutExecutor(spout), parallelism_hint);
    }

    public BoltDeclarer setBolt(String id, IControlBolt bolt, Number parallelism_hint) {
        return setBolt(id, new ControlBoltExecutor(bolt), parallelism_hint);
    }
    public BoltDeclarer setBolt(String id, IControlBolt bolt) {
        return setBolt(id, bolt, null);
    }

    public void setStateSpout(String id, IRichStateSpout stateSpout) {
        setStateSpout(id, stateSpout, null);
    }
    public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallelism_hint) {
        validateUnusedId(id);
        // TODO: finish
    }
    /**
     * 檢測輸入的id是否唯一
     * @param id
     */
    private void validateUnusedId(String id) {
        if (_bolts.containsKey(id)) {
            throw new IllegalArgumentException("Bolt has already been declared for id " + id);
        }
        if (_spouts.containsKey(id)) {
            throw new IllegalArgumentException("Spout has already been declared for id " + id);
        }
        if (_stateSpouts.containsKey(id)) {
            throw new IllegalArgumentException("State spout has already been declared for id " + id);
        }
    }

    private ComponentCommon getComponentCommon(String id, IComponent component) {
        ComponentCommon ret = new ComponentCommon(_commons.get(id));

        OutputFieldsGetter getter = new OutputFieldsGetter();
        component.declareOutputFields(getter);
        ret.set_streams(getter.getFieldsDeclaration());
        return ret;
    }
    /**
     * 定義了initCommon方法,用來初始化變量CommonentCommon對象,並給類成員變量_commons賦值
     * 初始化所做的工作:設置並行度還有一些其它配置
     * @param id
     * @param component
     * @param parallelism
     */
private void initCommon(String id, IComponent component, Number parallelism) {
        ComponentCommon common = new ComponentCommon();
        //設置消息流的來源及分組方式
        common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
        if (parallelism != null) {
            //設置並行度
            common.set_parallelism_hint(parallelism.intValue());
        } else {
            //如果並行度沒有手動設置則默認為1
            common.set_parallelism_hint(1);
        }
        Map conf = component.getComponentConfiguration();
        if (conf != null)
            //設置組件的配置參數
            common.set_json_conf(JSONValue.toJSONString(conf));
        _commons.put(id, common);
    }
}

從上面TopologyBuilder的類中可以看到這個類提供了創建StormTopology的方法以及一些數據源節點和處理節點的相關設置的方法,

還有就是存儲Bolt對象和Spout對象的方法,當然這里關於分組的代碼沒有寫出來。事實上這個類就是用來設置Spout節點和Bolt節點,

並通過分組方式將Spout和Bolt節點連接起來形成拓撲結構的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM