一個最簡單的JStorm例子


最簡單的JStorm例子分為以下幾個步驟:

1、生成Topology

 1 Map conf = new HashMp();
 2 //topology所有自定義的配置均放入這個Map
 3 
 4 TopologyBuilder builder = new TopologyBuilder();
 5 //創建topology的生成器
 6 
 7 int spoutParal = get("spout.parallel", 1);
 8 //獲取spout的並發設置
 9 
10 SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
11                 new SequenceSpout(), spoutParal);
12 //創建Spout, 其中new SequenceSpout() 為真正spout對象,SequenceTopologyDef.SEQUENCE_SPOUT_NAME 為spout的名字,注意名字中不要含有空格
13 
14 int boltParal = get("bolt.parallel", 1);
15 //獲取bolt的並發設置
16 
17 BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
18                 boltParal).shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
19 //創建bolt, SequenceTopologyDef.TOTAL_BOLT_NAME 為bolt名字,TotalCount 為bolt對象,boltParal為bolt並發數,
20 //shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME), 
21 //表示接收SequenceTopologyDef.SEQUENCE_SPOUT_NAME的數據,並且以shuffle方式,
22 //即每個spout隨機輪詢發送tuple到下一級bolt中
23 
24 int ackerParal = get("acker.parallel", 1);
25 Config.setNumAckers(conf, ackerParal);
26 //設置表示acker的並發數
27 
28 int workerNum = get("worker.num", 10);
29 conf.put(Config.TOPOLOGY_WORKERS, workerNum);
30 //表示整個topology將使用幾個worker
31 
32 conf.put(Config.STORM_CLUSTER_MODE, "distributed");
33 //設置topolog模式為分布式,這樣topology就可以放到JStorm集群上運行
34 
35 StormSubmitter.submitTopology(streamName, conf,
36                 builder.createTopology());
37 //提交topology

2、IRichSpout

IRichSpout 為最簡單的Spout接口

 1  IRichSpout{
 2 
 3     @Override
 4     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
 5     }
 6 
 7     @Override
 8     public void close() {
 9     }
10 
11     @Override
12     public void activate() {
13     }
14 
15     @Override
16     public void deactivate() {
17     }
18 
19     @Override
20     public void nextTuple() {
21     }
22 
23     @Override
24     public void ack(Object msgId) {
25     }
26 
27     @Override
28     public void fail(Object msgId) {
29     }
30 
31     @Override
32     public void declareOutputFields(OutputFieldsDeclarer declarer) {
33     }
34 
35     @Override
36     public Map<String, Object> getComponentConfiguration() {
37         return null;
38     }

 其中注意:

  • spout對象必須是繼承Serializable, 因此要求spout內所有數據結構必須是可序列化的
  • spout可以有構造函數,但構造函數只執行一次,是在提交任務時,創建spout對象,因此在task分配到具體worker之前的初始化工作可以在此處完成,一旦完成,初始化的內容將攜帶到每一個task內(因為提交任務時將spout序列化到文件中去,在worker起來時再將spout從文件中反序列化出來)。
  • open是當task起來后執行的初始化動作
  • close是當task被shutdown后執行的動作
  • activate 是當task被激活時,觸發的動作
  • deactivate 是task被deactive時,觸發的動作
  • nextTuple 是spout實現核心, nextuple完成自己的邏輯,即每一次取消息后,用collector 將消息emit出去。
  • ack, 當spout收到一條ack消息時,觸發的動作,詳情可以參考 ack機制
  • fail, 當spout收到一條fail消息時,觸發的動作,詳情可以參考 ack機制
  • declareOutputFields, 定義spout發送數據,每個字段的含義
  • getComponentConfiguration 獲取本spout的component 配置

3、Bolt

 1 IRichBolt {
 2 
 3     @Override
 4     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
 5     }
 6 
 7     @Override
 8     public void execute(Tuple input) {
 9     }
10 
11     @Override
12     public void cleanup() {
13     }
14 
15     @Override
16     public void declareOutputFields(OutputFieldsDeclarer declarer) {
17     }
18 
19     @Override
20     public Map<String, Object> getComponentConfiguration() {
21         return null;
22     }
23 
24 }

 其中注意:

  • bolt對象必須是繼承Serializable, 因此要求spout內所有數據結構必須是可序列化的
  • bolt可以有構造函數,但構造函數只執行一次,是在提交任務時,創建bolt對象,因此在task分配到具體worker之前的初始化工作可以在此處完成,一旦完成,初始化的內容將攜帶到每一個task內(因為提交任務時將bolt序列化到文件中去,在worker起來時再將bolt從文件中反序列化出來)。
  • prepare是當task起來后執行的初始化動作
  • cleanup是當task被shutdown后執行的動作
  • execute是bolt實現核心, 完成自己的邏輯,即接受每一次取消息后,處理完,有可能用collector 將產生的新消息emit出去。 ** 在executor中,當程序處理一條消息時,需要執行collector.ack, 詳情可以參考 ack機制 ** 在executor中,當程序無法處理一條消息時或出錯時,需要執行collector.fail ,詳情可以參考 ack機制
  • declareOutputFields, 定義bolt發送數據,每個字段的含義
  • getComponentConfiguration 獲取本bolt的component 配置

4、編譯

在Maven中配置

 1         <dependency>
 2             <groupId>com.alibaba.jstorm</groupId>
 3             <artifactId>jstorm-client</artifactId>
 4             <version>0.9.3.1</version>
 5             <scope>provided</scope>
 6         </dependency> 
 7 
 8 
 9          <dependency>
10             <groupId>com.alibaba.jstorm</groupId>
11             <artifactId>jstorm-client-extension</artifactId>
12             <version>0.9.3.1</version>
13             <scope>provided</scope>
14         </dependency>

如果找不到jstorm-client和jstorm-client-extension包,可以自己下載jstorm源碼進行編譯,請參考 源碼編譯

打包時,需要將所有依賴打入到一個包中

 1 <build>
 2         <plugins>
 3 
 4             <plugin>
 5                 <artifactId>maven-assembly-plugin</artifactId>
 6                 <configuration>
 7                     <descriptorRefs>
 8                         <descriptorRef>jar-with-dependencies</descriptorRef>
 9                     </descriptorRefs>
10                     <archive>
11                         <manifest>
12                             <mainClass>storm.starter.SequenceTopology</mainClass>
13                         </manifest>
14                     </archive>
15                 </configuration>
16                 <executions>
17                     <execution>
18                         <id>make-assembly</id>
19                         <phase>package</phase>
20                         <goals>
21                             <goal>single</goal>
22                         </goals>
23                     </execution>
24                 </executions>
25             </plugin>
26             <plugin>
27                 <groupId>org.apache.maven.plugins</groupId>
28                 <artifactId>maven-compiler-plugin</artifactId>
29                 <configuration>
30                     <source>1.6</source>
31                     <target>1.6</target>
32                 </configuration>
33             </plugin>
34         </plugins>
35     </build>

5、提交jar

jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter

  • xxxx.jar 為打包后的jar
  • com.alibaba.xxxx.xx 為入口類,即提交任務的類
  • parameter即為提交參數


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM