Storm WordCount Topology學習


1,分布式單詞計數的流程

首先要有數據源,在SentenceSpout中定義了一個字符串數組sentences來模擬數據源。字符串數組中的每句話作為一個tuple發射。其實,SplitBolt接收SentenceSpout發射的tuple,它將每句話分割成每個單詞,並將每個單詞作為tuple發射。再次,WordCountBolt接收SplitBolt發送的tuple,它將接收到的每一個單詞統計計數,並將 <單詞:出現次數> 作為tuple發射。最后,ReportBolt接收WordCountBolt發送的tuple,將統計的結果存入HashMap中,並打印出結果。

流程圖如下:

 

2,Topology的組成類

 ISpout、IComponent、IBolt三個接口定義了一些最基本的方法,BaseRichSpout、BaseRichBolt是接口的實現類,自定義的Spout與Bolt通過繼承實現類來完成工作。

 詳細解釋參考代碼里面的注釋。參考《Storm分布式實時計算模式》第一章中的例子。

1, SentenceSpou.java分析:

private String[] sentences = { "my dog has fleas", "i like cold beverages",
             "the dog ate my homework", "don't have a cow man",
             "i don't think i like fleas" };

定義了待發射的數據源。Spout從該字符串數組一次取一個字符串生成tuple進行發射。

 

32     public void open(@SuppressWarnings("rawtypes") Map conf, 33 TopologyContext context, SpoutOutputCollector collector) { 34 // TODO Auto-generated method stub 35 this.collector = collector; 36 }

open函數,在ISpout接口中定義,所有的Spout組件在初始化時調用這個方法。在open()中初始化了發射器。

 

55     public void declareOutputFields(OutputFieldsDeclarer declarer) {
56         // TODO Auto-generated method stub
57         declarer.declare(new Fields("sentence"));// 標記SentenceSpout發送的tuple的鍵為
58                                                     // sentence
59     }

declareOutputFields函數標記了該Spout發射的tuple的(字段值)鍵值。下游的Bolt可以通過該鍵值來接收它發出的tuple

 

41     public void nextTuple() {
42         // TODO Auto-generated method stub
43         // 以字符串數組sentences 中的每個字符串 作為參數 構造tuple
44         this.collector.emit(new Values(sentences[index]));// 通過emit方法將構造好的tuple發送出去
45         index++;
46         if (index >= sentences.length) {
47             index = 0;
48         }
49         Utils.sleep(100);
50     }

nextTuple()是所有Spout的核心方法。Storm通過調用這個方法向collector發射tuple。Values.java 繼承了ArrayList,new Values(...)構造了一個List對象,並將之作為emit的參數通過collector發射出去。

這里的發射規則是:每次發射其中一個字符串,阻塞100ms。當發射完整個字符串數組時,將索引(index)重新置0。可以繼續發射。除非顯示終止Topology,否則它不會停止。

 

 SentenceSpou.java代碼如下:

 1 package org.apache.storm.storm_core; 2 3 import java.util.Map; 4 5 import backtype.storm.spout.SpoutOutputCollector; 6 import backtype.storm.task.TopologyContext; 7 import backtype.storm.topology.OutputFieldsDeclarer; 8 import backtype.storm.topology.base.BaseRichSpout; 9 import backtype.storm.tuple.Fields; 10 import backtype.storm.tuple.Values; 11 import backtype.storm.utils.Utils; 12 13 public class SentenceSpout extends BaseRichSpout { 14 /** 15 * 16 */ 17 private static final long serialVersionUID = 3444934973982660864L; 18 private SpoutOutputCollector collector;// 用來向其他Spout發射tuple 19 private String[] sentences = { "my dog has fleas", "i like cold beverages", 20 "the dog ate my homework", "don't have a cow man", 21 "i don't think i like fleas" }; 22 23 private int index = 0; 24 25 /* 26 * open() 方法在所有的Spout組件初始化時被調用 27 * 28 * @param Map conf storm 配置信息 29 * 30 * @context TopologyContext topology 組件信息 31 */ 32 public void open(@SuppressWarnings("rawtypes") Map conf, 33 TopologyContext context, SpoutOutputCollector collector) { 34 // TODO Auto-generated method stub 35 this.collector = collector; 36 } 37 38 /* 39 * Values.java extends ArrayList Storm 調用該方法向輸出的collector發射tuple 40 */ 41 public void nextTuple() { 42 // TODO Auto-generated method stub 43 // 以字符串數組sentences 中的每個字符串 作為參數 構造tuple 44 this.collector.emit(new Values(sentences[index]));// 通過emit方法將構造好的tuple發送出去 45 index++; 46 if (index >= sentences.length) { 47 index = 0; 48 } 49 Utils.sleep(100); 50 } 51 52 /* 53 * SentenceSpout 發送的tuple它是一個包含鍵值對的List,該方法聲明了List中包含的鍵值對的鍵為 sentence 54 */ 55 public void declareOutputFields(OutputFieldsDeclarer declarer) { 56 // TODO Auto-generated method stub 57 declarer.declare(new Fields("sentence"));// 標記SentenceSpout發送的tuple的鍵為 58 // sentence 59 } 60 }

 

 

SplitBolt.java代碼如下:

 1 package org.apache.storm.storm_core; 2 3 import java.util.Map; 4 5 import backtype.storm.task.OutputCollector; 6 import backtype.storm.task.TopologyContext; 7 import backtype.storm.topology.OutputFieldsDeclarer; 8 import backtype.storm.topology.base.BaseRichBolt; 9 import backtype.storm.tuple.Fields; 10 import backtype.storm.tuple.Tuple; 11 import backtype.storm.tuple.Values; 12 13 public class SplitSentenceBolt extends BaseRichBolt { 14 /** 15 * 16 */ 17 private static final long serialVersionUID = -2107029392155190729L; 18 private OutputCollector collector;// 用來向其他Spout發射tuple的發射器 19 20 /* 21 * (non-Javadoc) prepare方法類似於open方法,prepare在bolt初始化時被調用 22 */ 23 public void prepare(Map stormConf, TopologyContext context, 24 OutputCollector collector) { 25 // TODO Auto-generated method stub 26 this.collector = collector;// 發射器初始化 27 28 } 29 30 public void execute(Tuple input) { 31 // TODO Auto-generated method stub 32 // 接收從SentenceSpout的發射器發射過來的tuple,因為SentenceSpout中聲明的tuple字段為sentence,故getStringByField方法的參數為sentence 33 String sentence = input.getStringByField("sentence");// 該tuple是一個包含 34 // 鍵為sentence 35 // 值為字符串 36 // 的列表List<Map<sentence,String>> 37 String[] words = sentence.split(" ");// 將字符串分解成一個個的單詞 38 for (String word : words) 39 this.collector.emit(new Values(word));// 將每個單詞構造成tuple並發送給下一個Spout 40 } 41 42 public void declareOutputFields(OutputFieldsDeclarer declarer) { 43 // TODO Auto-generated method stub 44 declarer.declare(new Fields("word"));// 定義SplitSentenceBolt發送的tuple的字段("鍵值")為 word 45 } 46 }

 

WordCountBolt.java

 1 package org.apache.storm.storm_core;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 
 6 import backtype.storm.task.OutputCollector;
 7 import backtype.storm.task.TopologyContext;
 8 import backtype.storm.topology.OutputFieldsDeclarer;
 9 import backtype.storm.topology.base.BaseRichBolt;
10 import backtype.storm.tuple.Fields;
11 import backtype.storm.tuple.Tuple;
12 import backtype.storm.tuple.Values;
13 
14 public class WordCountBolt extends BaseRichBolt{
15 
16     private OutputCollector collector;
17     private HashMap<String, Long>counts = null;//統計每個單詞出現的次數,放到HashMap中保存起來
18     
19     public void prepare(Map stormConf, TopologyContext context,
20             OutputCollector collector) {
21         // TODO Auto-generated method stub
22         this.collector = collector;
23         this.counts = new HashMap<String, Long>();//初始化HashMap,因為prepare會被自動調用的
24     }
25 
26     public void execute(Tuple input) {
27         // TODO Auto-generated method stub
28         String word = input.getStringByField("word");
29         Long count = this.counts.get(word);
30         if(count == null)//HashMap中沒有word這個單詞
31             count = 0L;
32         count++;
33         this.counts.put(word, count);//更新該單詞在HashMap中的統計次數
34         //此處發射的tuple包含了兩個元素:單詞和計數,它每次發送的是一個長度為2的List,
35         //可理解為:List.add(new HashMap("word",word)); List.add(new HashMap(("count",count));
36         this.collector.emit(new Values(word, count));//第一個元素的鍵為 "word",值為該單詞(a string),第二個鍵為 "count",值為單詞的計數
37     }
38 
39     public void declareOutputFields(OutputFieldsDeclarer declarer) {
40         // TODO Auto-generated method stub
41         declarer.declare(new Fields("word", "count"));
42     }
43 }

 

 

 ReportBolt.java如下:

package org.apache.storm.storm_core; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; public class ReportBolt extends BaseRichBolt{ /** * */ private static final long serialVersionUID = 4921144902730095910L; // private OutputCollector collector; ReportBolt不需要發射tuple了 private HashMap<String, Long> counts = null; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub this.counts = new HashMap<String, Long>(); } public void execute(Tuple input) { // TODO Auto-generated method stub String word = input.getStringByField("word"); Long count = input.getLongByField("count"); this.counts.put(word, count); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub //不需要發出任何數據流  } //Topology在storm集群中運行時,cleanup方法是不可靠的,並不能保證它一定會執行 public void cleanup(){ System.out.println("------ print counts ------"); List<String> keys = new ArrayList<String>(); keys.addAll(counts.keySet());//將HashMap中所有的鍵都添加到一個集合中 Collections.sort(keys);//對鍵(單詞)進行排序 for(String key : keys)//輸出排好序的每個單詞的出現次數 System.out.println(key + " : " + this.counts.get(key)); System.out.println("--------bye----------"); } }

 

 

WordCountTopology.java如下:

 1 package org.apache.storm.storm_core;
 2 
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.topology.TopologyBuilder;
 6 import backtype.storm.tuple.Fields;
 7 import backtype.storm.utils.Utils;
 8 
 9 public class WordCountTopology {
10     private static final String SENTENCE_SPOUT_ID = "sentence-spout";
11     private static final String SPLIT_BOLT_ID = "split-bolt";
12     private static final String COUNT_BOLT_ID = "count-bolt";
13     private static final String REPORT_BOLT_ID = "report-bolt";
14     private static final String TOPOLOGY_NAME = "word-count-topology";
15     
16     public static void main(String[] args) throws Exception{
17         SentenceSpout spout = new SentenceSpout();
18         SplitSentenceBolt splitBolt = new SplitSentenceBolt();
19         WordCountBolt countBolt = new WordCountBolt();
20         ReportBolt reportBolt = new ReportBolt();
21         
22         TopologyBuilder builder = new TopologyBuilder();
23         builder.setSpout(SENTENCE_SPOUT_ID, spout);
24         builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
25         builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
26         builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
27         
28         Config config = new Config();
29         LocalCluster cluster = new LocalCluster();
30         
31         cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
32         Utils.sleep(1000);
33         cluster.killTopology(TOPOLOGY_NAME);
34         cluster.shutdown();
35         
36     }
37 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM