創建maven項目,在pom.xml中加入以下配置:
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <type>jar</type> <version>0.9.3-rc1</version> </dependency>
創建SimpleSpout類用於獲取數據流:
1 package com.hirain.storm.helloworld; 2 3 import java.util.Map; 4 import java.util.Random; 5 6 import backtype.storm.spout.SpoutOutputCollector; 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.OutputFieldsDeclarer; 9 import backtype.storm.topology.base.BaseRichSpout; 10 import backtype.storm.tuple.Fields; 11 import backtype.storm.tuple.Values; 12 13 public class SimpleSpout extends BaseRichSpout{ 14 15 /** 16 * 17 */ 18 private static final long serialVersionUID = 1L; 19 20 //用來發射數據的工具類 21 private SpoutOutputCollector collector; 22 23 private static String[] info = new String[]{ 24 "comaple\t,12424,44w46,654,12424,44w46,654,", 25 "lisi\t,435435,6537,12424,44w46,654,", 26 "lipeng\t,45735,6757,12424,44w46,654,", 27 "hujintao\t,45735,6757,12424,44w46,654,", 28 "jiangmin\t,23545,6457,2455,7576,qr44453", 29 "beijing\t,435435,6537,12424,44w46,654,", 30 "xiaoming\t,46654,8579,w3675,85877,077998,", 31 "xiaozhang\t,9789,788,97978,656,345235,09889,", 32 "ceo\t,46654,8579,w3675,85877,077998,", 33 "cto\t,46654,8579,w3675,85877,077998,", 34 "zhansan\t,46654,8579,w3675,85877,077998,"}; 35 36 Random random=new Random(); 37 38 39 /** 40 * 在SpoutTracker類中被調用,每調用一次就可以向storm集群中發射一條數據(一個tuple元組),該方法會被不停的調用 41 */ 42 public void nextTuple() { 43 try { 44 String msg = info[random.nextInt(11)]; 45 // 調用發射方法 46 collector.emit(new Values(msg)); 47 // 模擬等待100ms 48 Thread.sleep(100); 49 } catch (InterruptedException e) { 50 e.printStackTrace(); 51 } 52 } 53 /** 54 * 初始化collector 55 */ 56 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 57 this.collector = collector; 58 59 } 60 61 62 /** 63 * 定義字段id,該id在簡單模式下沒有用處,但在按照字段分組的模式下有很大的用處。 64 * 該declarer變量有很大作用,我們還可以調用declarer.declareStream();來定義stramId,該id可以用來定義更加復雜的流拓撲結構 65 */ 66 public void declareOutputFields(OutputFieldsDeclarer declarer) { 67 declarer.declare(new Fields("source")); //collector.emit(new Values(msg));參數要對應 68 } 69 70 }
創建SimpleBolt類,用於處理數據:
1 package com.hirain.storm.helloworld; 2 3 import backtype.storm.topology.BasicOutputCollector; 4 import backtype.storm.topology.OutputFieldsDeclarer; 5 import backtype.storm.topology.base.BaseBasicBolt; 6 import backtype.storm.tuple.Fields; 7 import backtype.storm.tuple.Tuple; 8 import backtype.storm.tuple.Values; 9 10 11 12 public class SimpleBolt extends BaseBasicBolt { 13 14 /** 15 * 16 */ 17 private static final long serialVersionUID = 1L; 18 19 public void execute(Tuple input,BasicOutputCollector collector) { 20 try { 21 String msg = input.getString(0); 22 if (msg != null){ 23 //System.out.println("msg="+msg); 24 collector.emit(new Values(msg + "msg is processed!")); 25 } 26 27 } catch (Exception e) { 28 e.printStackTrace(); 29 } 30 31 } 32 33 public void declareOutputFields( 34 OutputFieldsDeclarer declarer) { 35 declarer.declare(new Fields("info")); 36 37 } 38 39 }
創建main方法配置storm的topology並啟動本地模式運行:
1 package com.hirain.storm.helloworld; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.StormSubmitter; 6 import backtype.storm.topology.TopologyBuilder; 7 8 public class SimpleTopology { 9 10 11 public static void main(String[] args) { 12 try { 13 // 實例化TopologyBuilder類。 14 TopologyBuilder topologyBuilder = new TopologyBuilder(); 15 // 設置噴發節點並分配並發數,該並發數將會控制該對象在集群中的線程數。 16 topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1); 17 // 設置數據處理節點並分配並發數。指定該節點接收噴發節點的策略為隨機方式。 18 topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout"); 19 Config config = new Config(); 20 config.setDebug(true); 21 if (args != null && args.length > 0) { 22 config.setNumWorkers(1); 23 StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology()); 24 } else { 25 // 這里是本地模式下運行的啟動代碼。 26 config.setMaxTaskParallelism(1); 27 LocalCluster cluster = new LocalCluster(); 28 cluster.submitTopology("simple", config, topologyBuilder.createTopology()); 29 } 30 31 } catch (Exception e) { 32 e.printStackTrace(); 33 } 34 } 35 }
以上為storm的簡單的helloworld,僅供參考