Apache Storm技術實戰之1 -- WordCountTopology


歡迎轉載,轉載請注意出處,徽滬一郎。

“源碼走讀系列”從代碼層面分析了storm的具體實現,接下來通過具體的實例來說明storm的使用。因為目前storm已經正式遷移到Apache,文章系列也由twitter storm轉為apache storm.

WordCountTopology 使用storm來統計文件中的每個單詞的出現次數。

通過該例子來說明tuple發送時的幾個要素

  1. source component   發送源
  2. destination component 接收者
  3. stream 消息通道
  4. tuple    消息本身

本文涉及到的開發環境搭建可以參考前面的兩篇博文。

  1. arch linux簡明安裝指南
  2. 在archlinux上搭建storm cluster

awk實現

其實對文件中的單詞進行統計是Linux下一個很常見的任務,用awk就可以輕松的解決(如果文件不是太大的話),下面是進行word counting的awk腳本,將其保存為名為wordcount.awk文件。

wordcount.awk

{
  for (i = 1; i<=NF; i++)
    freq[$i]++
}
END{
  for (word in freq)
    printf "%s\t%d\n",word,freq[word]
}

運行該腳本,對文件中的單詞進行統計

gawk -f wordcount.awk filename

原始版本

從github上復制內容

git clone https://github.com/nathanmarz/storm-starter.git

編譯運行

lein deps
lein compile
java -cp $(lein classpath) WordCountTopology

main函數

main函數的主要內容

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

注意:grouping操作的時候,如果沒有顯示指定stream id,則使用的是default stream. 如shuffleGrouping("spout")表示從名為spout的component中接收從default stream發送過來的tuple.

改進版本

在原始版本中,spout不停的向split bolt隨機發送句子,Count bolt統計每個單詞出現的次數。

那么能不能讓Spout在讀取完文件之后,通知下游的bolt顯示最柊的統計結果呢?

要想達到上述的改進目標,采用如上圖所示的結構即可。改變的地方如下,

  1. 在Spout中添加一個SUCCESS_STREAM
  2. 添加只有一個運行實例的statistics bolt
  3. 當spout讀取完文件內容之后,通過SUCCESS_STREAM告訴statistics bolt,文件已經處理完畢,可以打印當前的統計結果

RandomSentenceSpout.java

declareOutputFields

添加SUCCESS_STREAM

@Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
    declarer.declareStream("SUCCESS_STREAM",new Fields("word"));
  }

nextTuple

使用SUCCESS_STREAM通知下游,文件處理完畢

@Override
  public void nextTuple() {
    Utils.sleep(100);
    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
    if ( count == sentences.length ) 
    {
      System.out.println(count+" try to emit tuple by success_stream");
      _collector.emit("SUCCESS_STREAM",new Values(sentences[0]));
      count++;
    }else if ( count < sentences.length ){
      _collector.emit(new Values(sentences[count]));
      count++;
    }
  }

WordCountTopology.java

添加靜態類WordCount2

public static class WordCount2 extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      if ( tuple.getSourceStreamId() == "SUCCESS_STREAM" ) {
    System.out.println("prepare to print the statistics");
    for (String key : counts.keySet()) {
      System.out.println(key+"\t"+counts.get(key));
    }
    System.out.println("finish printing");
      }else {

    String word = tuple.getString(0);
    Integer count = counts.get(word);
    if (count == null)
      count = 0;
    count++;
    counts.put(word, count);
      }
    }

 

main函數

將spout的並行數由5改為1

 builder.setSpout("spout", new RandomSentenceSpout(), 1);

在原有的Topology中添加WordCount2 Bolt

 builder.setBolt("count2", new WordCount2(), 1).globalGrouping("count").globalGrouping("spout","SUCCESS_STREAM");

WordCount2 Bolt會接收從Count Bolt通過default stream發送的tuple,同時接收Spout通過SUCCESS_STREAM發送的tuple,也就是說wordcount2會接收從兩個stream來的數據。

編譯

編譯修改后的源文件

cd $STROM_STARTER
lein compile storm.starter

可能會出現以下異常信息,該異常可以忽略。

Exception in thread "main" java.io.FileNotFoundException: Could not locate storm/starter/WordCountTopology__init.class or storm/starter/WordCountTopology.clj on classpath:

運行

在local模式下運行修改后的WordCountTopology

java -cp $(lein classpath) storm.starter.WordCountTopology

如果一切正常,日志如下所示,線程的名字可能會有所不同。

moon    1
score    1
cow    1
doctor    1
over    1
nature    1
snow    1
four    1
keeps    1
with    1
a    1
white    1
dwarfs    1
at    1
the    4
and    2
i    1
two    1
away    1
seven    2
apple    1
am    1
an    1
jumped    1
day    1
years    1
ago    1

 結果驗證

可以將WordCountTopology的運行結果和awk腳本的運行結果相比對,結果應該是一致的。

小技巧

  1. awk腳本的執行結果存為一個文件result1.log, WordCountTopology的輸出中單詞統計部分存為result2.log
  2. 用vim打開result1.log,進行sorting,保存結果;用vim打開result2.log,進行sorting,保存。
  3. 然后用vimdiff來進行比較 vimdiff result1.log result2.log


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM