JStorm第一個程序WordCount詳解


一、Strom基本知識(回顧)
1,首先明確Storm各個組件的作用,包括Nimbus,Supervisor,Spout,Bolt,Task,Worker,Tuple
  • nimbus是整個storm任務的管理者,並不實際進行工作。負責在集群中分發代碼,對節點分配任務,並監視主機故障。
  • supervisor是實際進行工作的節點,負責監聽工作節點上已經分配的主機作業,啟動和停止Nimbus已經分配的工作進程。
  • Worker是具體處理Spout/Bolt邏輯的進程,worker數量由拓撲中的conf.setNumWorkers來定義,storm會在每個Worker上均勻分配任務,一個Worker只能執行一個topology,但是可以執行其中的多個任務線程。
  • 一個worker是一個進程,被啟動的時候表現為一個JVM進程(內存更改需要配置storm.yaml里面的worker.childopts: "-Xmx2048m"參數),里面可以同時運行多個線程,這些線程就是task。
  • Tuple是spout與bolt、bolt與bolt之間傳遞消息(流)的基本單元,對於Storm來說是一個無邊界的鏈表,每個值要事先聲明它的域(field)
  • task是spout和bolt執行的最小單元。
  • 下面的結構圖顯示了各個component之間的關系

圖片來自:http://www.cnblogs.com/foreach-break/p/storm_worker_executor_spout_bolt_simbus_supervisor_mk-assignments.html

參考:http://blog.csdn.net/cuihaolong/article/details/52652686(storm各個節點介紹和容錯機制)

2,一個簡單的storm程序的基本流程是:spout作為數據源(可以來自hdfs,hbase等,也可以自發產生數據,比如wordcount這個例子)傳送給bolt,bolt對數據進行處理,傳給其它bolt或者直接輸出。他們之間傳送的數據是Tuple,可以成為數據元組。
3,Storm運行模式:
  • 本地模式(Local Mode): 即Topology(相當於一個任務,后續會詳細講解)  運行在本地機器的單一JVM上,這個模式主要用來開發、調試。
  • 遠程模式(Remote Mode):在這個模式,我們把我們的Topology提交到集群,在這個模式中,Storm的所有組件都是線程安全的,因為它們都會運行在不同的Jvm或物理機器上,這個模式就是正式的生產模式。
二、WordCount詳解
程序描述
  • spout隨機發送一個准備好的字符串數組里面的一個字符串(sentence)
  • 第一層SplitBolt,負責對spout發過來的數據(sentence)進行split,分解成獨立的單詞,並按照一定的規則發往下一層bolt處理
  • 第二層CountBolt,接收第一層bolt傳過來的數據,並對各個單詞進行數量計算
程序流程
  • spout數據源
  • bolt1進行split操作
  • bolt2進行count操作
  • Topolgy運行程序
0,WordCountTopology類:創建拓撲,運行程序
重要方法和參數解釋:
  • setSpout,setBolt,shuffleGrouping——見代碼注釋和之后的Grouping方式介紹
  • setNumWorkers——設置worker數量,每個worker占用一個端口(storm.yaml里面的supervisor.slots.ports配置)
  • setNumTasks——設置每個executor跑多少個task(本實例中沒有配置這個參數,jstorm默認每個executor跑一個task[spout/bolt])
  • setMaxTaskParallelism——設置此拓撲中組件允許的最大並行度。(此配置通常用於測試以限制所生成的線程數)
 1 package act.chenkh.study.jstormPlay;
 2 
 3 import java.io.File;
 4 
 5 import backtype.storm.Config;
 6 import backtype.storm.LocalCluster;
 7 import backtype.storm.StormSubmitter;
 8 import backtype.storm.topology.TopologyBuilder;
 9 import backtype.storm.tuple.Fields;
10 
11 public class WordCountTopology {
12     public static void main(String[] args) throws Exception {
13         /**第一步,設計一個Topolgy*/
14         TopologyBuilder builder = new TopologyBuilder();
15         /*
16          * 設置spout和bolt,完整參數為
17          * 1,spout的id(即name)
18          * 2,spout對象
19          * 3,executor數量即並發數,也就是設置多少個executor來執行spout/bolt(此項沒有默認null)
20          */
21         //setSpout
22         builder.setSpout("sentence-spout",new RandomSentenceSpout(),1);
23         //setBolt:SplitBolt的grouping策略是上層隨機分發,CountBolt的grouping策略是按照上層字段分發
24         //如果想要從多個Bolt獲取數據,可以繼續設置grouping
25         builder.setBolt("split-bolt", new SplitBolt(),1)
26             .shuffleGrouping("sentence-spout");
27         builder.setBolt("count-bolt", new CountBolt(),1)
28             .fieldsGrouping("split-bolt", new Fields("word"))
29             .fieldsGrouping("sentence-spout",new Fields("word"));
30         /**第二步,進行基本配置*/  
31         Config conf = new Config();
32         //作用和影響???????????
33         conf.setDebug(true);
34         if (args != null && args.length > 0) {
35             conf.setNumWorkers(1);
36             StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
37             }
38         else {
39             /*
40              * run in local cluster, for test in eclipse.
41              */
42             conf.setMaxTaskParallelism(3);
43             LocalCluster cluster = new LocalCluster();  
44             cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());  
45             Thread.sleep(Integer.MAX_VALUE);  
46             cluster.shutdown();  
47         }
48     }
49 }
View Code
1,RandomSentenceSpout類:產生數據
重要方法和參數解釋:
  • open——spout初始化調用
  • nextTuple——系統不斷調用
  • declareOutputFields——聲明輸出tuple包含哪些字段
 1 package act.chenkh.study.jstormPlay;
 2 
 3 import java.util.Map;
 4 import java.util.Random;
 5 
 6 import org.apache.log4j.Logger;
 7 import backtype.storm.spout.SpoutOutputCollector;
 8 import backtype.storm.task.TopologyContext;
 9 import backtype.storm.topology.IRichSpout;
10 import backtype.storm.topology.OutputFieldsDeclarer;
11 import backtype.storm.tuple.Fields;
12 import backtype.storm.tuple.Values;
13 import backtype.storm.utils.Time;
14 import backtype.storm.utils.Utils;
15 /*
16  * RandomSentenceSpout實現了IRichSpout接口
17  * Spout需要實現的接口可以是:
18  *    1,IRichSpout:最基本的Spout,繼承自ISpout, IComponent,沒有任何特殊方法(一般用這個)
19  *    2,IControlSpout:繼承自IComponent,包括open,close,activate,deactivate,nextTuple,ack(Object msgId),fail等方法
20  */
21 public class RandomSentenceSpout implements IRichSpout {
22     
23     /**
24      * 
25      */
26     private static final long serialVersionUID = 4058847280819269954L;
27     private static final Logger logger = Logger.getLogger(RandomSentenceSpout.class);
28     SpoutOutputCollector _collector;
29     Random _rand;
30     String component;
31     /*
32      * Spout初始化的時候調用
33      */
34     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
35         _collector = collector;
36         _rand = new Random();
37         component = context.getThisComponentId();
38     }
39     /*
40      * 系統框架會不斷調用
41      */
42     public void nextTuple() {
43         String[] sentences = new String[] { "Hello world! This is my first programme of JStorm",
44                 "Hello JStorm,Nice to meet you!", "Hi JStorm, do you have a really good proformance",
45                 "Goodbye JStorm,see you tomorrow" };
46         String sentence = sentences[_rand.nextInt(sentences.length)];
47         _collector.emit(new Values(sentence), Time.currentTimeSecs());
48         Utils.sleep(1000);
49     }
50     @Override
51     public void ack(Object arg0) {
52         logger.debug("ACK!");
53     }
54 
55     public void activate() {
56         logger.debug("ACTIVE!");
57     }
58 
59     public void close() {
60 
61     }
62 
63     public void deactivate() {
64 
65     }
66 
67     public void fail(Object arg0) {
68         logger.debug("FAILED!");
69     }
70     /*
71      * 聲明框架有哪些輸出的字段
72      */
73     public void declareOutputFields(OutputFieldsDeclarer declarer) {
74         declarer.declare(new Fields("word"));
75     }
76 
77     public Map<String, Object> getComponentConfiguration() {
78         return null;
79     }
80 
81 }
View Code

2,SplitBolt類:接收上層tuple,進行split,分發給下一層

重要方法和參數解釋:

  • cleanup,execute,prepare,declareOutputFields——見代碼注釋
 1 package act.chenkh.study.jstormPlay;
 2 
 3 import java.util.Map;
 4 
 5 //import org.slf4j.Logger;
 6 //import org.slf4j.LoggerFactory;
 7 
 8 import org.apache.log4j.Logger;
 9 
10 import backtype.storm.task.TopologyContext;
11 import backtype.storm.topology.BasicOutputCollector;
12 import backtype.storm.topology.OutputFieldsDeclarer;
13 import backtype.storm.topology.base.BaseBasicBolt;
14 import backtype.storm.tuple.Fields;
15 import backtype.storm.tuple.Tuple;
16 import backtype.storm.tuple.Values;
17 /*
18  * 
19  * IBasicBolt:繼承自IComponent,包括prepare,execut,cleanup等方法
20  */
21 public class SplitBolt extends BaseBasicBolt {
22     /**
23      * 
24      */
25     private static final long serialVersionUID = 7104767103420386784L;
26     private static final Logger logger = Logger.getLogger(SplitBolt.class);
27     String component;
28     /* cleanup方法在bolt被關閉的時候調用, 它應該清理所有被打開的資源。(基本只能在local mode使用)
29      * 但是集群不保證這個方法一定會被執行。比如執行task的機器down掉了,那么根本就沒有辦法來調用那個方法。 
30      * cleanup設計的時候是被用來在local mode的時候才被調用(也就是說在一個進程里面模擬整個storm集群), 
31      * 並且你想在關閉一些topology的時候避免資源泄漏。
32      * (非 Javadoc)
33      * @see backtype.storm.topology.base.BaseBasicBolt#cleanup()
34      */
35     public void cleanup() {
36 
37     }
38     //接收消息之后被調用的方法
39     public void execute(Tuple input,BasicOutputCollector collector) {
40         String sentence = input.getString(0);
41         String[] words = sentence.split("[,|\\s+]");
42         for(String word : words){
43             word = word.trim();
44             if(!word.isEmpty()){
45                 word = word.toLowerCase();
46                 collector.emit(new Values(word));
47             }
48         }
49     }
50     /*
51      * prepare方法在worker初始化task的時候調用. 
52      * 
53      * prepare方法提供給bolt一個Outputcollector用來發射tuple。
54      * Bolt可以在任何時候發射tuple — 在prepare, execute或者cleanup方法里面, 或者甚至在另一個線程里面異步發射。
55      * 這里prepare方法只是簡單地把OutputCollector作為一個類字段保存下來給后面execute方法 使用。
56      */
57     
58     public void prepare(Map stromConf, TopologyContext context) {
59         component = context.getThisComponentId();
60     }
61 
62     /*
63      * declearOutputFields方法僅在有新的topology提交到服務器, 
64      * 用來決定輸出內容流的格式(相當於定義spout/bolt之間傳輸stream的name:value格式), 
65      * 在topology執行的過程中並不會被調用.
66      * (非 Javadoc)
67      * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)
68      */
69     public void declareOutputFields(OutputFieldsDeclarer declarer) {
70         declarer.declare(new Fields("word"));
71     }
72 }
View Code

3,CountBolt類:接收上層tuple,進行count,展示輸出

 1 package act.chenkh.study.jstormPlay;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 
 6 import org.apache.log4j.Logger;
 7 
 8 import com.alibaba.jstorm.callback.AsyncLoopThread;
 9 import com.alibaba.jstorm.callback.RunnableCallback;
10 
11 import backtype.storm.task.TopologyContext;
12 import backtype.storm.topology.BasicOutputCollector;
13 import backtype.storm.topology.OutputFieldsDeclarer;
14 import backtype.storm.topology.base.BaseBasicBolt;
15 import backtype.storm.tuple.Fields;
16 import backtype.storm.tuple.Tuple;
17 import clojure.inspector__init;
18 
19 public class CountBolt extends BaseBasicBolt {
20     Integer id;  
21     String name;  
22     Map<String, Integer> counters;     
23     String component;
24     private static final Logger LOG = Logger.getLogger(CountBolt.class);
25     private AsyncLoopThread statThread;
26     /** 
27      * On create  
28      */  
29     @Override  
30     public void prepare(Map stormConf, TopologyContext context) {  
31         this.counters = new HashMap<String, Integer>();  
32         this.name = context.getThisComponentId();  
33         this.id = context.getThisTaskId();  
34         this.statThread = new AsyncLoopThread(new statRunnable());
35         
36         LOG.info(stormConf.get("abc")+"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
37         component = context.getThisComponentId();
38     }  
39   
40     public void declareOutputFields(OutputFieldsDeclarer declarer) {
41          declarer.declare(new Fields("word","count"));
42          // declarer.declareStream("coord-"+"word-counter", new Fields("epoch","ebagNum"));
43          // LOG.info("set stream coord-"+component);
44     }  
45   
46     //接收消息之后被調用的方法
47     public void execute(Tuple input, BasicOutputCollector collector) {
48 //        String str = input.getString(0);
49         String str = input.getStringByField("word");
50         if(!counters.containsKey(str)){  
51             counters.put(str, 1);  
52         }else{
53             Integer c = counters.get(str) + 1;  
54             counters.put(str, c);  
55         }  
56     }  
57     class statRunnable extends RunnableCallback {
58 
59         @Override
60         public void run() {
61             while(true){
62                 try {
63                     Thread.sleep(10000);
64                 } catch (InterruptedException e) {
65 
66                 }
67                 LOG.info("\n-- Word Counter ["+name+"-"+id+"] --");  
68                 for(Map.Entry<String, Integer> entry : counters.entrySet()){  
69                     LOG.info(entry.getKey()+": "+entry.getValue());  
70                 } 
71                 LOG.info("");
72             }
73             
74         }
75     }
76 
77 }
View Code

參考:http://fireinwind.iteye.com/blog/2153699(第一個Storm應用)

三、Grouping的幾種方式

四、Bolt的聲明周期

1、在定義Topology實例過程中,定義好Spout實例和Bolt實例
2、在提交Topology實例給Nimbus的過程中,會調用TopologyBuilder實例的createTopology()方法,以獲取定義的Topology實例。在運行createTopology()方法的過程中,會去調用Spout和Bolt實例上的declareOutputFields()方法和getComponentConfiguration()方法,declareOutputFields()方法配置Spout和Bolt實例的輸出,getComponentConfiguration()方法輸出特定於Spout和Bolt實例的配置參數值對。Storm會將以上過程中得到的實例,輸出配置和配置參數值對等數據序列化,然后傳遞給Nimbus。
3、在Worker Node上運行的thread,從Nimbus上復制序列化后得到的字節碼文件,從中反序列化得到Spout和Bolt實例,實例的輸出配置和實例的配置參數值對等數據,在thread中Spout和Bolt實例的declareOutputFields()和getComponentConfiguration()不會再運行。
4、在thread中,反序列化得到一個Bolt實例后,它會先運行Bolt實例的prepare()方法,在這個方法調用中,需要傳入一個OutputCollector實例,后面使用該OutputCollector實例輸出Tuple
5、接下來在該thread中按照配置數量建立task集合,然后在每個task中就會循環調用thread所持有Bolt實例的execute()方法
6、在關閉一個thread時,thread所持有的Bolt實例會調用cleanup()方法
不過如果是強制關閉,這個cleanup()方法有可能不會被調用到

五、Stream里面的Tuple

1,Stream是storm里面的關鍵抽象。一個stream是一個沒有邊界的tuple序列。
storm提供一些原語來分布式地、可靠地把一個stream傳輸進一個新的stream。比如: 你可以把一個tweets流傳輸到熱門話題的流。
storm提供的最基本的處理stream的原語是spout和bolt。你可以實現Spout和Bolt對應的接口以處理你的應用的邏輯。
spout的流的源頭。比如一個spout可能從Kestrel隊列里面讀取消息並且把這些消息發射成一個流。又比如一個spout可以調用twitter的一個api並且把返回的tweets發射成一個流。
通常Spout會從外部數據源(隊列、數據庫等)讀取數據,然后封裝成Tuple形式,之后發送到Stream中。Spout是一個主動的角色,在接口內部有個nextTuple函數,Storm框架會不停的調用該函數。

bolt可以接收任意多個輸入stream, 作一些處理, 有些bolt可能還會發射一些新的stream。一些復雜的流轉換, 比如從一些tweet里面計算出熱門話題, 需要多個步驟, 從而也就需要多個bolt。 Bolt可以做任何事情: 運行函數, 過濾tuple, 做一些聚合, 做一些合並以及訪問數據庫等等。
Bolt處理輸入的Stream,並產生新的輸出Stream。Bolt可以執行過濾、函數操作、Join、操作數據庫等任何操作。Bolt是一個被動的 角色,其接口中有一個execute(Tuple input)方法,在接收到消息之后會調用此函數,用戶可以在此方法中執行自己的處理邏輯。

spout和bolt所組成一個網絡會被打包成topology, topology是storm里面最高一級的抽象(類似 Job), 你可以把topology提交給storm的集群來運行。
參考:http://www.cnblogs.com/wuxiang/p/5629138.html(Storm入門原理介紹)
2,Tuple: 消息傳遞的基本單位。
 在spout發送的時候,函數原型
1 public List<Integer> emit(List<Object> tuple, Object messageId) {
2         return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
3     }
View Code

這里的tuple, 實際上是List<Object> 對象,返回的是 List<Integer> 是要發送的tast的IdsList

在bolt接收的時候, 變成一個Tuple對象,  結構應該也是一個list, List<Field1, value1, Field2, value2..>這樣的一個結構, FieldList ValueList, 我們根據對應的fieldname就可以取出對應的getIntegerByField方法

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM