- nimbus是整個storm任務的管理者,並不實際進行工作。負責在集群中分發代碼,對節點分配任務,並監視主機故障。
- supervisor是實際進行工作的節點,負責監聽工作節點上已經分配的主機作業,啟動和停止Nimbus已經分配的工作進程。
- Worker是具體處理Spout/Bolt邏輯的進程,worker數量由拓撲中的conf.setNumWorkers來定義,storm會在每個Worker上均勻分配任務,一個Worker只能執行一個topology,但是可以執行其中的多個任務線程。
- 一個worker是一個進程,被啟動的時候表現為一個JVM進程(內存更改需要配置storm.yaml里面的worker.childopts: "-Xmx2048m"參數),里面可以同時運行多個線程,這些線程就是task。
- Tuple是spout與bolt、bolt與bolt之間傳遞消息(流)的基本單元,對於Storm來說是一個無邊界的鏈表,每個值要事先聲明它的域(field)
- task是spout和bolt執行的最小單元。
- 下面的結構圖顯示了各個component之間的關系
圖片來自:http://www.cnblogs.com/foreach-break/p/storm_worker_executor_spout_bolt_simbus_supervisor_mk-assignments.html
參考:http://blog.csdn.net/cuihaolong/article/details/52652686(storm各個節點介紹和容錯機制)

- 本地模式(Local Mode): 即Topology(相當於一個任務,后續會詳細講解) 運行在本地機器的單一JVM上,這個模式主要用來開發、調試。
- 遠程模式(Remote Mode):在這個模式,我們把我們的Topology提交到集群,在這個模式中,Storm的所有組件都是線程安全的,因為它們都會運行在不同的Jvm或物理機器上,這個模式就是正式的生產模式。
- spout隨機發送一個准備好的字符串數組里面的一個字符串(sentence)
- 第一層SplitBolt,負責對spout發過來的數據(sentence)進行split,分解成獨立的單詞,並按照一定的規則發往下一層bolt處理
- 第二層CountBolt,接收第一層bolt傳過來的數據,並對各個單詞進行數量計算
- spout數據源
- bolt1進行split操作
- bolt2進行count操作
- Topolgy運行程序
- setSpout,setBolt,shuffleGrouping——見代碼注釋和之后的Grouping方式介紹,
- setNumWorkers——設置worker數量,每個worker占用一個端口(storm.yaml里面的supervisor.slots.ports配置)
- setNumTasks——設置每個executor跑多少個task(本實例中沒有配置這個參數,jstorm默認每個executor跑一個task[spout/bolt])
- setMaxTaskParallelism——設置此拓撲中組件允許的最大並行度。(此配置通常用於測試以限制所生成的線程數)

1 package act.chenkh.study.jstormPlay; 2 3 import java.io.File; 4 5 import backtype.storm.Config; 6 import backtype.storm.LocalCluster; 7 import backtype.storm.StormSubmitter; 8 import backtype.storm.topology.TopologyBuilder; 9 import backtype.storm.tuple.Fields; 10 11 public class WordCountTopology { 12 public static void main(String[] args) throws Exception { 13 /**第一步,設計一個Topolgy*/ 14 TopologyBuilder builder = new TopologyBuilder(); 15 /* 16 * 設置spout和bolt,完整參數為 17 * 1,spout的id(即name) 18 * 2,spout對象 19 * 3,executor數量即並發數,也就是設置多少個executor來執行spout/bolt(此項沒有默認null) 20 */ 21 //setSpout 22 builder.setSpout("sentence-spout",new RandomSentenceSpout(),1); 23 //setBolt:SplitBolt的grouping策略是上層隨機分發,CountBolt的grouping策略是按照上層字段分發 24 //如果想要從多個Bolt獲取數據,可以繼續設置grouping 25 builder.setBolt("split-bolt", new SplitBolt(),1) 26 .shuffleGrouping("sentence-spout"); 27 builder.setBolt("count-bolt", new CountBolt(),1) 28 .fieldsGrouping("split-bolt", new Fields("word")) 29 .fieldsGrouping("sentence-spout",new Fields("word")); 30 /**第二步,進行基本配置*/ 31 Config conf = new Config(); 32 //作用和影響??????????? 33 conf.setDebug(true); 34 if (args != null && args.length > 0) { 35 conf.setNumWorkers(1); 36 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 37 } 38 else { 39 /* 40 * run in local cluster, for test in eclipse. 41 */ 42 conf.setMaxTaskParallelism(3); 43 LocalCluster cluster = new LocalCluster(); 44 cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology()); 45 Thread.sleep(Integer.MAX_VALUE); 46 cluster.shutdown(); 47 } 48 } 49 }
- open——spout初始化調用
- nextTuple——系統不斷調用
- declareOutputFields——聲明輸出tuple包含哪些字段

1 package act.chenkh.study.jstormPlay; 2 3 import java.util.Map; 4 import java.util.Random; 5 6 import org.apache.log4j.Logger; 7 import backtype.storm.spout.SpoutOutputCollector; 8 import backtype.storm.task.TopologyContext; 9 import backtype.storm.topology.IRichSpout; 10 import backtype.storm.topology.OutputFieldsDeclarer; 11 import backtype.storm.tuple.Fields; 12 import backtype.storm.tuple.Values; 13 import backtype.storm.utils.Time; 14 import backtype.storm.utils.Utils; 15 /* 16 * RandomSentenceSpout實現了IRichSpout接口 17 * Spout需要實現的接口可以是: 18 * 1,IRichSpout:最基本的Spout,繼承自ISpout, IComponent,沒有任何特殊方法(一般用這個) 19 * 2,IControlSpout:繼承自IComponent,包括open,close,activate,deactivate,nextTuple,ack(Object msgId),fail等方法 20 */ 21 public class RandomSentenceSpout implements IRichSpout { 22 23 /** 24 * 25 */ 26 private static final long serialVersionUID = 4058847280819269954L; 27 private static final Logger logger = Logger.getLogger(RandomSentenceSpout.class); 28 SpoutOutputCollector _collector; 29 Random _rand; 30 String component; 31 /* 32 * Spout初始化的時候調用 33 */ 34 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){ 35 _collector = collector; 36 _rand = new Random(); 37 component = context.getThisComponentId(); 38 } 39 /* 40 * 系統框架會不斷調用 41 */ 42 public void nextTuple() { 43 String[] sentences = new String[] { "Hello world! This is my first programme of JStorm", 44 "Hello JStorm,Nice to meet you!", "Hi JStorm, do you have a really good proformance", 45 "Goodbye JStorm,see you tomorrow" }; 46 String sentence = sentences[_rand.nextInt(sentences.length)]; 47 _collector.emit(new Values(sentence), Time.currentTimeSecs()); 48 Utils.sleep(1000); 49 } 50 @Override 51 public void ack(Object arg0) { 52 logger.debug("ACK!"); 53 } 54 55 public void activate() { 56 logger.debug("ACTIVE!"); 57 } 58 59 public void close() { 60 61 } 62 63 public void deactivate() { 64 65 } 66 67 public void fail(Object arg0) { 68 logger.debug("FAILED!"); 69 } 70 /* 71 * 聲明框架有哪些輸出的字段 72 */ 73 public void declareOutputFields(OutputFieldsDeclarer declarer) { 74 declarer.declare(new Fields("word")); 75 } 76 77 public Map<String, Object> getComponentConfiguration() { 78 return null; 79 } 80 81 }
2,SplitBolt類:接收上層tuple,進行split,分發給下一層
重要方法和參數解釋:
- cleanup,execute,prepare,declareOutputFields——見代碼注釋

1 package act.chenkh.study.jstormPlay; 2 3 import java.util.Map; 4 5 //import org.slf4j.Logger; 6 //import org.slf4j.LoggerFactory; 7 8 import org.apache.log4j.Logger; 9 10 import backtype.storm.task.TopologyContext; 11 import backtype.storm.topology.BasicOutputCollector; 12 import backtype.storm.topology.OutputFieldsDeclarer; 13 import backtype.storm.topology.base.BaseBasicBolt; 14 import backtype.storm.tuple.Fields; 15 import backtype.storm.tuple.Tuple; 16 import backtype.storm.tuple.Values; 17 /* 18 * 19 * IBasicBolt:繼承自IComponent,包括prepare,execut,cleanup等方法 20 */ 21 public class SplitBolt extends BaseBasicBolt { 22 /** 23 * 24 */ 25 private static final long serialVersionUID = 7104767103420386784L; 26 private static final Logger logger = Logger.getLogger(SplitBolt.class); 27 String component; 28 /* cleanup方法在bolt被關閉的時候調用, 它應該清理所有被打開的資源。(基本只能在local mode使用) 29 * 但是集群不保證這個方法一定會被執行。比如執行task的機器down掉了,那么根本就沒有辦法來調用那個方法。 30 * cleanup設計的時候是被用來在local mode的時候才被調用(也就是說在一個進程里面模擬整個storm集群), 31 * 並且你想在關閉一些topology的時候避免資源泄漏。 32 * (非 Javadoc) 33 * @see backtype.storm.topology.base.BaseBasicBolt#cleanup() 34 */ 35 public void cleanup() { 36 37 } 38 //接收消息之后被調用的方法 39 public void execute(Tuple input,BasicOutputCollector collector) { 40 String sentence = input.getString(0); 41 String[] words = sentence.split("[,|\\s+]"); 42 for(String word : words){ 43 word = word.trim(); 44 if(!word.isEmpty()){ 45 word = word.toLowerCase(); 46 collector.emit(new Values(word)); 47 } 48 } 49 } 50 /* 51 * prepare方法在worker初始化task的時候調用. 52 * 53 * prepare方法提供給bolt一個Outputcollector用來發射tuple。 54 * Bolt可以在任何時候發射tuple — 在prepare, execute或者cleanup方法里面, 或者甚至在另一個線程里面異步發射。 55 * 這里prepare方法只是簡單地把OutputCollector作為一個類字段保存下來給后面execute方法 使用。 56 */ 57 58 public void prepare(Map stromConf, TopologyContext context) { 59 component = context.getThisComponentId(); 60 } 61 62 /* 63 * declearOutputFields方法僅在有新的topology提交到服務器, 64 * 用來決定輸出內容流的格式(相當於定義spout/bolt之間傳輸stream的name:value格式), 65 * 在topology執行的過程中並不會被調用. 66 * (非 Javadoc) 67 * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer) 68 */ 69 public void declareOutputFields(OutputFieldsDeclarer declarer) { 70 declarer.declare(new Fields("word")); 71 } 72 }
3,CountBolt類:接收上層tuple,進行count,展示輸出

1 package act.chenkh.study.jstormPlay; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 6 import org.apache.log4j.Logger; 7 8 import com.alibaba.jstorm.callback.AsyncLoopThread; 9 import com.alibaba.jstorm.callback.RunnableCallback; 10 11 import backtype.storm.task.TopologyContext; 12 import backtype.storm.topology.BasicOutputCollector; 13 import backtype.storm.topology.OutputFieldsDeclarer; 14 import backtype.storm.topology.base.BaseBasicBolt; 15 import backtype.storm.tuple.Fields; 16 import backtype.storm.tuple.Tuple; 17 import clojure.inspector__init; 18 19 public class CountBolt extends BaseBasicBolt { 20 Integer id; 21 String name; 22 Map<String, Integer> counters; 23 String component; 24 private static final Logger LOG = Logger.getLogger(CountBolt.class); 25 private AsyncLoopThread statThread; 26 /** 27 * On create 28 */ 29 @Override 30 public void prepare(Map stormConf, TopologyContext context) { 31 this.counters = new HashMap<String, Integer>(); 32 this.name = context.getThisComponentId(); 33 this.id = context.getThisTaskId(); 34 this.statThread = new AsyncLoopThread(new statRunnable()); 35 36 LOG.info(stormConf.get("abc")+"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); 37 component = context.getThisComponentId(); 38 } 39 40 public void declareOutputFields(OutputFieldsDeclarer declarer) { 41 declarer.declare(new Fields("word","count")); 42 // declarer.declareStream("coord-"+"word-counter", new Fields("epoch","ebagNum")); 43 // LOG.info("set stream coord-"+component); 44 } 45 46 //接收消息之后被調用的方法 47 public void execute(Tuple input, BasicOutputCollector collector) { 48 // String str = input.getString(0); 49 String str = input.getStringByField("word"); 50 if(!counters.containsKey(str)){ 51 counters.put(str, 1); 52 }else{ 53 Integer c = counters.get(str) + 1; 54 counters.put(str, c); 55 } 56 } 57 class statRunnable extends RunnableCallback { 58 59 @Override 60 public void run() { 61 while(true){ 62 try { 63 Thread.sleep(10000); 64 } catch (InterruptedException e) { 65 66 } 67 LOG.info("\n-- Word Counter ["+name+"-"+id+"] --"); 68 for(Map.Entry<String, Integer> entry : counters.entrySet()){ 69 LOG.info(entry.getKey()+": "+entry.getValue()); 70 } 71 LOG.info(""); 72 } 73 74 } 75 } 76 77 }
參考:http://fireinwind.iteye.com/blog/2153699(第一個Storm應用)
三、Grouping的幾種方式
四、Bolt的聲明周期
1、在定義Topology實例過程中,定義好Spout實例和Bolt實例
2、在提交Topology實例給Nimbus的過程中,會調用TopologyBuilder實例的createTopology()方法,以獲取定義的Topology實例。在運行createTopology()方法的過程中,會去調用Spout和Bolt實例上的declareOutputFields()方法和getComponentConfiguration()方法,declareOutputFields()方法配置Spout和Bolt實例的輸出,getComponentConfiguration()方法輸出特定於Spout和Bolt實例的配置參數值對。Storm會將以上過程中得到的實例,輸出配置和配置參數值對等數據序列化,然后傳遞給Nimbus。
3、在Worker Node上運行的thread,從Nimbus上復制序列化后得到的字節碼文件,從中反序列化得到Spout和Bolt實例,實例的輸出配置和實例的配置參數值對等數據,在thread中Spout和Bolt實例的declareOutputFields()和getComponentConfiguration()不會再運行。
4、在thread中,反序列化得到一個Bolt實例后,它會先運行Bolt實例的prepare()方法,在這個方法調用中,需要傳入一個OutputCollector實例,后面使用該OutputCollector實例輸出Tuple
5、接下來在該thread中按照配置數量建立task集合,然后在每個task中就會循環調用thread所持有Bolt實例的execute()方法
6、在關閉一個thread時,thread所持有的Bolt實例會調用cleanup()方法
不過如果是強制關閉,這個cleanup()方法有可能不會被調用到
五、Stream里面的Tuple

1 public List<Integer> emit(List<Object> tuple, Object messageId) { 2 return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId); 3 }
這里的tuple, 實際上是List<Object> 對象,返回的是 List<Integer> 是要發送的tast的IdsList
在bolt接收的時候, 變成一個Tuple對象, 結構應該也是一個list, List<Field1, value1, Field2, value2..>這樣的一個結構, FieldList ValueList, 我們根據對應的fieldname就可以取出對應的getIntegerByField方法