最簡單的JStorm例子分為以下幾個步驟:
1、生成Topology
1 Map conf = new HashMp(); 2 //topology所有自定義的配置均放入這個Map 3 4 TopologyBuilder builder = new TopologyBuilder(); 5 //創建topology的生成器 6 7 int spoutParal = get("spout.parallel", 1); 8 //獲取spout的並發設置 9 10 SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME, 11 new SequenceSpout(), spoutParal); 12 //創建Spout, 其中new SequenceSpout() 為真正spout對象,SequenceTopologyDef.SEQUENCE_SPOUT_NAME 為spout的名字,注意名字中不要含有空格 13 14 int boltParal = get("bolt.parallel", 1); 15 //獲取bolt的並發設置 16 17 BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(), 18 boltParal).shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME); 19 //創建bolt, SequenceTopologyDef.TOTAL_BOLT_NAME 為bolt名字,TotalCount 為bolt對象,boltParal為bolt並發數, 20 //shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME), 21 //表示接收SequenceTopologyDef.SEQUENCE_SPOUT_NAME的數據,並且以shuffle方式, 22 //即每個spout隨機輪詢發送tuple到下一級bolt中 23 24 int ackerParal = get("acker.parallel", 1); 25 Config.setNumAckers(conf, ackerParal); 26 //設置表示acker的並發數 27 28 int workerNum = get("worker.num", 10); 29 conf.put(Config.TOPOLOGY_WORKERS, workerNum); 30 //表示整個topology將使用幾個worker 31 32 conf.put(Config.STORM_CLUSTER_MODE, "distributed"); 33 //設置topolog模式為分布式,這樣topology就可以放到JStorm集群上運行 34 35 StormSubmitter.submitTopology(streamName, conf, 36 builder.createTopology()); 37 //提交topology
2、IRichSpout
IRichSpout 為最簡單的Spout接口
1 IRichSpout{ 2 3 @Override 4 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 5 } 6 7 @Override 8 public void close() { 9 } 10 11 @Override 12 public void activate() { 13 } 14 15 @Override 16 public void deactivate() { 17 } 18 19 @Override 20 public void nextTuple() { 21 } 22 23 @Override 24 public void ack(Object msgId) { 25 } 26 27 @Override 28 public void fail(Object msgId) { 29 } 30 31 @Override 32 public void declareOutputFields(OutputFieldsDeclarer declarer) { 33 } 34 35 @Override 36 public Map<String, Object> getComponentConfiguration() { 37 return null; 38 }
其中注意:
- spout對象必須是繼承Serializable, 因此要求spout內所有數據結構必須是可序列化的
- spout可以有構造函數,但構造函數只執行一次,是在提交任務時,創建spout對象,因此在task分配到具體worker之前的初始化工作可以在此處完成,一旦完成,初始化的內容將攜帶到每一個task內(因為提交任務時將spout序列化到文件中去,在worker起來時再將spout從文件中反序列化出來)。
- open是當task起來后執行的初始化動作
- close是當task被shutdown后執行的動作
- activate 是當task被激活時,觸發的動作
- deactivate 是task被deactive時,觸發的動作
- nextTuple 是spout實現核心, nextuple完成自己的邏輯,即每一次取消息后,用collector 將消息emit出去。
- ack, 當spout收到一條ack消息時,觸發的動作,詳情可以參考 ack機制
- fail, 當spout收到一條fail消息時,觸發的動作,詳情可以參考 ack機制
- declareOutputFields, 定義spout發送數據,每個字段的含義
- getComponentConfiguration 獲取本spout的component 配置
3、Bolt
1 IRichBolt { 2 3 @Override 4 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 5 } 6 7 @Override 8 public void execute(Tuple input) { 9 } 10 11 @Override 12 public void cleanup() { 13 } 14 15 @Override 16 public void declareOutputFields(OutputFieldsDeclarer declarer) { 17 } 18 19 @Override 20 public Map<String, Object> getComponentConfiguration() { 21 return null; 22 } 23 24 }
其中注意:
- bolt對象必須是繼承Serializable, 因此要求spout內所有數據結構必須是可序列化的
- bolt可以有構造函數,但構造函數只執行一次,是在提交任務時,創建bolt對象,因此在task分配到具體worker之前的初始化工作可以在此處完成,一旦完成,初始化的內容將攜帶到每一個task內(因為提交任務時將bolt序列化到文件中去,在worker起來時再將bolt從文件中反序列化出來)。
- prepare是當task起來后執行的初始化動作
- cleanup是當task被shutdown后執行的動作
- execute是bolt實現核心, 完成自己的邏輯,即接受每一次取消息后,處理完,有可能用collector 將產生的新消息emit出去。 ** 在executor中,當程序處理一條消息時,需要執行collector.ack, 詳情可以參考 ack機制 ** 在executor中,當程序無法處理一條消息時或出錯時,需要執行collector.fail ,詳情可以參考 ack機制
- declareOutputFields, 定義bolt發送數據,每個字段的含義
- getComponentConfiguration 獲取本bolt的component 配置
4、編譯
在Maven中配置
1 <dependency> 2 <groupId>com.alibaba.jstorm</groupId> 3 <artifactId>jstorm-client</artifactId> 4 <version>0.9.3.1</version> 5 <scope>provided</scope> 6 </dependency> 7 8 9 <dependency> 10 <groupId>com.alibaba.jstorm</groupId> 11 <artifactId>jstorm-client-extension</artifactId> 12 <version>0.9.3.1</version> 13 <scope>provided</scope> 14 </dependency>
如果找不到jstorm-client和jstorm-client-extension包,可以自己下載jstorm源碼進行編譯,請參考 源碼編譯
打包時,需要將所有依賴打入到一個包中
1 <build> 2 <plugins> 3 4 <plugin> 5 <artifactId>maven-assembly-plugin</artifactId> 6 <configuration> 7 <descriptorRefs> 8 <descriptorRef>jar-with-dependencies</descriptorRef> 9 </descriptorRefs> 10 <archive> 11 <manifest> 12 <mainClass>storm.starter.SequenceTopology</mainClass> 13 </manifest> 14 </archive> 15 </configuration> 16 <executions> 17 <execution> 18 <id>make-assembly</id> 19 <phase>package</phase> 20 <goals> 21 <goal>single</goal> 22 </goals> 23 </execution> 24 </executions> 25 </plugin> 26 <plugin> 27 <groupId>org.apache.maven.plugins</groupId> 28 <artifactId>maven-compiler-plugin</artifactId> 29 <configuration> 30 <source>1.6</source> 31 <target>1.6</target> 32 </configuration> 33 </plugin> 34 </plugins> 35 </build>
5、提交jar
jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter
- xxxx.jar 為打包后的jar
- com.alibaba.xxxx.xx 為入口類,即提交任務的類
- parameter即為提交參數