在Strom/JStorm中有一個類是特別重要的,主要用來構建Topology的,這個類就是TopologyBuilder.
咱先看一下簡單的例子:
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("input", new RandomSentenceSpout(), 2); builder.setBolt("bolt_sentence", new SplitSentenceBolt(), 2) .shuffleGrouping("input"); // 本地模式:最主要用來調試用 LocalCluster cluster = new LocalCluster(); System.out.println("start wordcount"); cluster.submitTopology("word count", conf, builder.createTopology()); }
在上面的main方法里先創建TopologyBuilder對象,然后設置好已創建的Spout節點和Bolt節點,並用隨機分組(shuffleGrouping)將Spout和Bolt節點連接起來形成Topology。
那TopologyBuilder是如何做的呢?請看下面TopologyBuilder源代碼:
/** * TopologyBuilder是一個用於構建Topology的工具類 * */ public class TopologyBuilder { /** * 定義了類成員變量_bolts,用來存放IRichBolt類型的所有Bolt對象 */ private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>(); /** * 定義了類成員變量_spouts,用來存放IRichSpout類型的所有Spout對象 */ private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>(); /** * 定義了類成員變量_commons,存放了所有的Bolt和Spout對象 */ private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>(); // private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>(); private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<String, StateSpoutSpec>(); /** * 根據傳入的Bolt和Spout對象構建StormTopology對象 * @return */ public StormTopology createTopology() { Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>(); Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>(); for (String boltId : _bolts.keySet()) { //根據boltId從_bolts中獲取到對應的bolt對象 IRichBolt bolt = _bolts.get(boltId); //設置對應ComponentCommon對象的streams(輸出的字段列表以及是否是直接流)屬性值 ComponentCommon common = getComponentCommon(boltId, bolt); /** * 先將Bolts對象序列化得到數組,再創建Bolt對象,所以所有在StormTopology中Bolts是對象序列化過后得到的字節數組. */ boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common)); } for (String spoutId : _spouts.keySet()) { //根據spoutId從_spouts中獲取到對應的spout對象 IRichSpout spout = _spouts.get(spoutId); //設置對應ComponentCommon對象的streams(輸出的字段列表以及是否是直接流) ComponentCommon common = getComponentCommon(spoutId, spout); /** * 先將Spout對象序列化得到數組,再創建SpoutSpec對象,所以所有在StormTopology中Spouts是對象序列化過后得到的字節數組. */ spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common)); } //將上述所設置的所有組件都封裝到StormTopology對象中,最后提交到集群中運行 return new StormTopology(spoutSpecs, boltSpecs, new HashMap<String, StateSpoutSpec>()); } /** * 下面幾個方法定義了setBolt方法以及它的重載方法 */ /** * 在這個topology中定義一個只有單線程並行度的新的bolt * 其它想要消耗這個bolt的輸出的組件會引用這個id */ public BoltDeclarer setBolt(String id, IRichBolt bolt) { return setBolt(id, bolt, null); } /** * 為這個topology定義一個指定數量的並行度的bolt */ public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) { //檢測傳入的組件id是否唯一 validateUnusedId(id); //生成common對象 initCommon(id, bolt, parallelism_hint); _bolts.put(id, bolt); return new BoltGetter(id); } public BoltDeclarer setBolt(String id, IBasicBolt bolt) { return setBolt(id, bolt, null); } public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) { /** * 該方法利用BasicBoltExecutor包裝(封裝)傳入的IBasicBolt對象 * 在BasicBoltExecutor中實現了對消息的追蹤 */ return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint); } /** * 下面幾個方法定義了setSpout方法以及它的重載方法 */ public SpoutDeclarer setSpout(String id, IRichSpout spout) { return setSpout(id, spout, null); } public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) { //檢測輸入的id是否唯一,若已經存在將拋出異常 validateUnusedId(id); /** * 構建ComponentCommon對象並進行相對應的初始化,最后放入到_commons(在上述中已經定義) */ initCommon(id, spout, parallelism_hint); _spouts.put(id, spout); return new SpoutGetter(id); } public SpoutDeclarer setSpout(String id, IControlSpout spout) { return setSpout(id, spout, null); } public SpoutDeclarer setSpout(String id, IControlSpout spout, Number parallelism_hint) { return setSpout(id, new ControlSpoutExecutor(spout), parallelism_hint); } public BoltDeclarer setBolt(String id, IControlBolt bolt, Number parallelism_hint) { return setBolt(id, new ControlBoltExecutor(bolt), parallelism_hint); } public BoltDeclarer setBolt(String id, IControlBolt bolt) { return setBolt(id, bolt, null); } public void setStateSpout(String id, IRichStateSpout stateSpout) { setStateSpout(id, stateSpout, null); } public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallelism_hint) { validateUnusedId(id); // TODO: finish } /** * 檢測輸入的id是否唯一 * @param id */ private void validateUnusedId(String id) { if (_bolts.containsKey(id)) { throw new IllegalArgumentException("Bolt has already been declared for id " + id); } if (_spouts.containsKey(id)) { throw new IllegalArgumentException("Spout has already been declared for id " + id); } if (_stateSpouts.containsKey(id)) { throw new IllegalArgumentException("State spout has already been declared for id " + id); } } private ComponentCommon getComponentCommon(String id, IComponent component) { ComponentCommon ret = new ComponentCommon(_commons.get(id)); OutputFieldsGetter getter = new OutputFieldsGetter(); component.declareOutputFields(getter); ret.set_streams(getter.getFieldsDeclaration()); return ret; } /** * 定義了initCommon方法,用來初始化變量CommonentCommon對象,並給類成員變量_commons賦值 * 初始化所做的工作:設置並行度還有一些其它配置 * @param id * @param component * @param parallelism */ private void initCommon(String id, IComponent component, Number parallelism) { ComponentCommon common = new ComponentCommon(); //設置消息流的來源及分組方式 common.set_inputs(new HashMap<GlobalStreamId, Grouping>()); if (parallelism != null) { //設置並行度 common.set_parallelism_hint(parallelism.intValue()); } else { //如果並行度沒有手動設置則默認為1 common.set_parallelism_hint(1); } Map conf = component.getComponentConfiguration(); if (conf != null) //設置組件的配置參數 common.set_json_conf(JSONValue.toJSONString(conf)); _commons.put(id, common); } }
從上面TopologyBuilder的類中可以看到這個類提供了創建StormTopology的方法以及一些數據源節點和處理節點的相關設置的方法,
還有就是存儲Bolt對象和Spout對象的方法,當然這里關於分組的代碼沒有寫出來。事實上這個類就是用來設置Spout節點和Bolt節點,
並通過分組方式將Spout和Bolt節點連接起來形成拓撲結構的。