Storm的BaseBasicBolt源碼解析ack機制


我們在學習ack機制的時候,我們知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。
在BaseBasicBolt中,BasicOutputCollector在emit數據的時候,會自動和輸入的tuple相關聯,而在execute方法結束的時候那個輸入tuple會被自動ack。
在使用BaseRichBolt需要在emit數據的時候,顯示指定該數據的源tuple要加上第二個參數anchor tuple,以保持tracker鏈路,即collector.emit(oldTuple, newTuple);並且需要在execute執行成功后調用OutputCollector.ack(tuple), 當失敗處理時,執行OutputCollector.fail(tuple);

那么我們來看看BasicBolt的源碼是不是這樣的,不能因為看到別人的帖子說是這樣的,我們就這樣任務,以訛傳訛,我們要To see is to believe。

 

為了方便看源代碼,我先上我們的繼承類:

public class SplitSentenceBolt extends BaseBasicBolt { public void prepare(Map stormConf, TopologyContext context) { super.prepare(stormConf, context); } 
  //5:執行我們自己的邏輯處理方法,接收傳入的參數。
  
public void execute(Tuple input, BasicOutputCollector collector) { String sentence = (String)input.getValueByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { word = word.trim(); word = word.toLowerCase(); collector.emit(new Values(word,1));//這個地方就是調用OutputCollector的包裝類,來發消息 } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","num")); } }

 



通過打斷點,我們發現,bolt的task會創建這個類下面會標准執行順序

public class BasicBoltExecutor implements IRichBolt { public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class); private IBasicBolt _bolt; private transient BasicOutputCollector _collector;  //1:創建該對象,然后把我們寫的SplitSentenceBolt對象賦給父類IBasicBolt。 public BasicBoltExecutor(IBasicBolt bolt) { _bolt = bolt; } public void declareOutputFields(OutputFieldsDeclarer declarer) { _bolt.declareOutputFields(declarer);//這里就是調用SplitSentenceBolt對象的方法了。 }  //2:給BasicOutputCollector _collector字段賦值,BasicOutputCollector就是對OutputCollector類的包裝。 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _bolt.prepare(stormConf, context); _collector = new BasicOutputCollector(collector); }   //3:然后程序執行該方法,input的值source: spout1:4, stream: default, id: {}, [+ - * % /]
    public void execute(Tuple input) { _collector.setContext(input);//把接收到的tuple值設置給BasicOutputCollector中inputTuple字段。
        try { _bolt.execute(input, _collector);//這個地方是調用我們實現類SplitSentenceBolt的ececute方法。
            _collector.getOutputter().ack(input);//這個地方就是響應
        } catch(FailedException e) { if(e instanceof ReportedFailedException) { _collector.reportError(e); } _collector.getOutputter().fail(input);//這個地方就是響應
 } } public void cleanup() { _bolt.cleanup(); } public Map<String, Object> getComponentConfiguration() { return _bolt.getComponentConfiguration(); } }

 

 

public class BasicOutputCollector implements IBasicOutputCollector { private OutputCollector out; private Tuple inputTuple; public BasicOutputCollector(OutputCollector out) { this.out = out; }
//4:把收到的tuple數據賦值給inputTuple,這個時候BasicOutputCollector對象的字段都具有值了。
  public void setContext(Tuple inputTuple) { this.inputTuple = inputTuple; }
   //6:這里我們發送新的(轉換后的)tuple數據,看他內部的調用,其實他也會發送一個anchor tuple保持tracker鏈路
而這個anchor tuple就是bolt接收到轉換前的源tuple數據。

  public List<Integer> emit(List<Object> tuple) {
     return emit(Utils.DEFAULT_STREAM_ID, tuple);
   }
public List<Integer> emit(String streamId, List<Object> tuple) { return out.emit(streamId, inputTuple, tuple); } public void emitDirect(int taskId, String streamId, List<Object> tuple) { out.emitDirect(taskId, streamId, inputTuple, tuple); } public void emitDirect(int taskId, List<Object> tuple) { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple); } protected IOutputCollector getOutputter() { return out; } public void reportError(Throwable t) { out.reportError(t); } }

這里大家不要糾結bolt的啟動時從哪里開始的,我后面會講的,這里我們關注的是,BasicBoltExecutor對象創建后的執行過程,以這我們來看執行的過程。在BasicBoltExecutor的execute方法中,我們看到了ack和fail方法會被自動調用的,當我們的程序拋出異常則會執行fail方法的。

這個

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM