Storm常見模式——BasicBolt


Storm中的很多Bolt都有一個最常見的處理步驟:

  1. 讀入一個tuple;
  2. 根據這個輸入tuple,提取后發射0個,1個或多個tuple;
  3. 最后,通過ack操作確認這個tuple被成功處理。

按照上述處理步驟,依次處理發向這個Bolt的各個tuple元組。

這種模式可以實現像ETL這類的簡單函數或過濾器功能,Storm中專門為這種模式封裝了相應接口:IBasicBoltBaseBasicBolt等類實現了這一接口。

下面是以BaseBasicBolt為基礎,按照上述模式實現詞頻統計的Bolt(代碼參考鏈接:storm-starter):

public static class WordCount extends BaseBasicBolt {
     //記錄每個單詞及單詞出現的次數
        Map<String, Integer> counts = new HashMap<String, Integer>();

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word); //提取單詞出現次數
            if(count==null)
         count = 0;
            count++;
            counts.put(word, count); //更新單詞出現次數
            collector.emit(new Values(word, count)); //發射統計結果
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
       declarer.declare(new Fields("word", "count"));
        }
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM