Storm中的很多Bolt都有一個最常見的處理步驟:
- 讀入一個tuple;
- 根據這個輸入tuple,提取后發射0個,1個或多個tuple;
- 最后,通過ack操作確認這個tuple被成功處理。
按照上述處理步驟,依次處理發向這個Bolt的各個tuple元組。
這種模式可以實現像ETL這類的簡單函數或過濾器功能,Storm中專門為這種模式封裝了相應接口:IBasicBolt。BaseBasicBolt等類實現了這一接口。
下面是以BaseBasicBolt為基礎,按照上述模式實現詞頻統計的Bolt(代碼參考鏈接:storm-starter):
public static class WordCount extends BaseBasicBolt { //記錄每個單詞及單詞出現的次數 Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); //提取單詞出現次數 if(count==null) count = 0; count++; counts.put(word, count); //更新單詞出現次數 collector.emit(new Values(word, count)); //發射統計結果 } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }