歡迎轉載,轉載請注意出處,徽滬一郎。
“源碼走讀系列”從代碼層面分析了storm的具體實現,接下來通過具體的實例來說明storm的使用。因為目前storm已經正式遷移到Apache,文章系列也由twitter storm轉為apache storm.
WordCountTopology 使用storm來統計文件中的每個單詞的出現次數。
通過該例子來說明tuple發送時的幾個要素
- source component 發送源
- destination component 接收者
- stream 消息通道
- tuple 消息本身
本文涉及到的開發環境搭建可以參考前面的兩篇博文。
awk實現
其實對文件中的單詞進行統計是Linux下一個很常見的任務,用awk就可以輕松的解決(如果文件不是太大的話),下面是進行word counting的awk腳本,將其保存為名為wordcount.awk文件。
wordcount.awk
{ for (i = 1; i<=NF; i++) freq[$i]++ } END{ for (word in freq) printf "%s\t%d\n",word,freq[word] }
運行該腳本,對文件中的單詞進行統計
gawk -f wordcount.awk filename
原始版本
從github上復制內容
git clone https://github.com/nathanmarz/storm-starter.git
編譯運行
lein deps lein compile java -cp $(lein classpath) WordCountTopology
main函數
main函數的主要內容
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
注意:grouping操作的時候,如果沒有顯示指定stream id,則使用的是default stream. 如shuffleGrouping("spout")表示從名為spout的component中接收從default stream發送過來的tuple.
改進版本
在原始版本中,spout不停的向split bolt隨機發送句子,Count bolt統計每個單詞出現的次數。
那么能不能讓Spout在讀取完文件之后,通知下游的bolt顯示最柊的統計結果呢?
要想達到上述的改進目標,采用如上圖所示的結構即可。改變的地方如下,
- 在Spout中添加一個SUCCESS_STREAM
- 添加只有一個運行實例的statistics bolt
- 當spout讀取完文件內容之后,通過SUCCESS_STREAM告訴statistics bolt,文件已經處理完畢,可以打印當前的統計結果
RandomSentenceSpout.java
declareOutputFields
添加SUCCESS_STREAM
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); declarer.declareStream("SUCCESS_STREAM",new Fields("word")); }
nextTuple
使用SUCCESS_STREAM通知下游,文件處理完畢
@Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; if ( count == sentences.length ) { System.out.println(count+" try to emit tuple by success_stream"); _collector.emit("SUCCESS_STREAM",new Values(sentences[0])); count++; }else if ( count < sentences.length ){ _collector.emit(new Values(sentences[count])); count++; } }
WordCountTopology.java
添加靜態類WordCount2
public static class WordCount2 extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { if ( tuple.getSourceStreamId() == "SUCCESS_STREAM" ) { System.out.println("prepare to print the statistics"); for (String key : counts.keySet()) { System.out.println(key+"\t"+counts.get(key)); } System.out.println("finish printing"); }else { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); } }
main函數
將spout的並行數由5改為1
builder.setSpout("spout", new RandomSentenceSpout(), 1);
在原有的Topology中添加WordCount2 Bolt
builder.setBolt("count2", new WordCount2(), 1).globalGrouping("count").globalGrouping("spout","SUCCESS_STREAM");
WordCount2 Bolt會接收從Count Bolt通過default stream發送的tuple,同時接收Spout通過SUCCESS_STREAM發送的tuple,也就是說wordcount2會接收從兩個stream來的數據。
編譯
編譯修改后的源文件
cd $STROM_STARTER
lein compile storm.starter
可能會出現以下異常信息,該異常可以忽略。
Exception in thread "main" java.io.FileNotFoundException: Could not locate storm/starter/WordCountTopology__init.class or storm/starter/WordCountTopology.clj on classpath:
運行
在local模式下運行修改后的WordCountTopology
java -cp $(lein classpath) storm.starter.WordCountTopology
如果一切正常,日志如下所示,線程的名字可能會有所不同。
moon 1
score 1
cow 1
doctor 1
over 1
nature 1
snow 1
four 1
keeps 1
with 1
a 1
white 1
dwarfs 1
at 1
the 4
and 2
i 1
two 1
away 1
seven 2
apple 1
am 1
an 1
jumped 1
day 1
years 1
ago 1
結果驗證
可以將WordCountTopology的運行結果和awk腳本的運行結果相比對,結果應該是一致的。
小技巧
- awk腳本的執行結果存為一個文件result1.log, WordCountTopology的輸出中單詞統計部分存為result2.log
- 用vim打開result1.log,進行sorting,保存結果;用vim打開result2.log,進行sorting,保存。
- 然后用vimdiff來進行比較 vimdiff result1.log result2.log