1.全局定時器
1 import java.util.Map; 2 3 import backtype.storm.Config; 4 import backtype.storm.Constants; 5 import backtype.storm.LocalCluster; 6 import backtype.storm.spout.SpoutOutputCollector; 7 import backtype.storm.task.OutputCollector; 8 import backtype.storm.task.TopologyContext; 9 import backtype.storm.topology.OutputFieldsDeclarer; 10 import backtype.storm.topology.TopologyBuilder; 11 import backtype.storm.topology.base.BaseRichBolt; 12 import backtype.storm.topology.base.BaseRichSpout; 13 import backtype.storm.tuple.Fields; 14 import backtype.storm.tuple.Tuple; 15 import backtype.storm.tuple.Values; 16 import backtype.storm.utils.Utils; 17 18 /** 19 * 全局定時器 20 * 21 * 數字累加求和 22 * 先添加storm依賴 23 * 24 * @author Administrator 25 * 26 */ 27 public class LocalTopologySumTimer1 { 28 29 30 /** 31 * spout需要繼承baserichspout,實現未實現的方法 32 * @author Administrator 33 * 34 */ 35 public static class MySpout extends BaseRichSpout{ 36 private Map conf; 37 private TopologyContext context; 38 private SpoutOutputCollector collector; 39 40 /** 41 * 初始化方法,只會執行一次 42 * 在這里面可以寫一個初始化的代碼 43 * Map conf:其實里面保存的是topology的一些配置信息 44 * TopologyContext context:topology的上下文,類似於servletcontext 45 * SpoutOutputCollector collector:發射器,負責向外發射數據(tuple) 46 */ 47 @Override 48 public void open(Map conf, TopologyContext context, 49 SpoutOutputCollector collector) { 50 this.conf = conf; 51 this.context = context; 52 this.collector = collector; 53 } 54 55 int num = 1; 56 /** 57 * 這個方法是spout中最重要的方法, 58 * 這個方法會被storm框架循環調用,可以理解為這個方法是在一個while循環之內 59 * 每調用一次,會向外發射一條數據 60 */ 61 @Override 62 public void nextTuple() { 63 System.out.println("spout發射:"+num); 64 //把數據封裝到values中,稱為一個tuple,發射出去 65 this.collector.emit(new Values(num++)); 66 Utils.sleep(1000); 67 } 68 69 /** 70 * 聲明輸出字段 71 */ 72 @Override 73 public void declareOutputFields(OutputFieldsDeclarer declarer) { 74 //給values中的數據起個名字,方便后面的bolt從這個values中取數據 75 //fields中定義的參數和values中傳遞的數值是一一對應的 76 declarer.declare(new Fields("num")); 77 } 78 79 } 80 81 82 /** 83 * 自定義bolt需要實現baserichbolt 84 * @author Administrator 85 * 86 */ 87 public static class MyBolt extends BaseRichBolt{ 88 private Map stormConf; 89 private TopologyContext context; 90 private OutputCollector collector; 91 92 /** 93 * 和spout中的open方法意義一樣 94 */ 95 @Override 96 public void prepare(Map stormConf, TopologyContext context, 97 OutputCollector collector) { 98 this.stormConf = stormConf; 99 this.context = context; 100 this.collector = collector; 101 } 102 103 int sum = 0; 104 /** 105 * 是bolt中最重要的方法,當spout發射一個tuple出來,execute也會被調用,需要對spout發射出來的tuple進行處理 106 */ 107 @Override 108 public void execute(Tuple input) { 109 if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){ 110 //如果滿足,就說明這個tuple是系統幾倍的組件發送的,也就意味着定時時間到了 111 System.out.println("定時任務執行了。"); 112 113 }else{//這個地方必須要做判斷,否則讓系統級別的tuple去取"num"會取不到報錯的. 114 //這個地方的邏輯可以將產生的數據封裝成一個map或者是list放在內存中.到達定時任務的時候取出來,使用batch批處理向數據庫中操作. 115 //然后再把集合中的數據清空...之后再添加. 116 117 //input.getInteger(0);//也可以根據角標獲取tuple中的數據 118 Integer value = input.getIntegerByField("num"); 119 sum+=value; 120 System.out.println("和:"+sum); 121 } 122 123 } 124 125 /** 126 * 聲明輸出字段 127 */ 128 @Override 129 public void declareOutputFields(OutputFieldsDeclarer declarer) { 130 //在這沒必要定義了,因為execute方法中沒有向外發射tuple,所以就不需要聲明了。 131 //如果nextTuple或者execute方法中向外發射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明 132 } 133 134 } 135 /** 136 * 注意:在組裝topology的時候,組件的id在定義的時候,名稱不能以__開頭。__是系統保留的 137 * @param args 138 */ 139 public static void main(String[] args) { 140 //組裝topology 141 TopologyBuilder topologyBuilder = new TopologyBuilder(); 142 topologyBuilder.setSpout("spout1", new MySpout()); 143 //.shuffleGrouping("spout1"); 表示讓MyBolt接收MySpout發射出來的tuple 144 topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1"); 145 146 //創建本地storm集群 147 LocalCluster localCluster = new LocalCluster(); 148 Config config = new Config(); 149 //下面這樣設置就是一個全局的定時任務 還有局部的定時任務. 150 config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);//表示每隔5秒storm會給Topology中的所有bolt發射一個系統級別的tuple 151 //前面的單詞計數的例子 我們可能只需要在最后一個CountBolt中做定時任務 SpiltBolt中不需要做定時任務 但是兩個Bolt中都可以收到這個系統級別的tuple 152 //所以需要每個Bolt中都做判斷...SplitBolt可以加上一個判斷 沒有方法體...if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){ } 153 //否則會出錯...從系統級別的tuple取你定義的值 取不到 報錯. 154 localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology()); 155 156 } 157 158 159 }
局部定時器
1 /** 2 * 局部定時器 3 * 4 * 數字累加求和 5 * 先添加storm依賴 6 * 7 * @author Administrator 8 * 9 */ 10 public class LocalTopologySumTimer2 { 11 12 13 /** 14 * spout需要繼承baserichspout,實現未實現的方法 15 * @author Administrator 16 * 17 */ 18 public static class MySpout extends BaseRichSpout{ 19 private Map conf; 20 private TopologyContext context; 21 private SpoutOutputCollector collector; 22 23 /** 24 * 初始化方法,只會執行一次 25 * 在這里面可以寫一個初始化的代碼 26 * Map conf:其實里面保存的是topology的一些配置信息 27 * TopologyContext context:topology的上下文,類似於servletcontext 28 * SpoutOutputCollector collector:發射器,負責向外發射數據(tuple) 29 */ 30 @Override 31 public void open(Map conf, TopologyContext context, 32 SpoutOutputCollector collector) { 33 this.conf = conf; 34 this.context = context; 35 this.collector = collector; 36 } 37 38 int num = 1; 39 /** 40 * 這個方法是spout中最重要的方法, 41 * 這個方法會被storm框架循環調用,可以理解為這個方法是在一個while循環之內 42 * 每調用一次,會向外發射一條數據 43 */ 44 @Override 45 public void nextTuple() { 46 System.out.println("spout發射:"+num); 47 //把數據封裝到values中,稱為一個tuple,發射出去 48 this.collector.emit(new Values(num++)); 49 Utils.sleep(1000); 50 } 51 52 /** 53 * 聲明輸出字段 54 */ 55 @Override 56 public void declareOutputFields(OutputFieldsDeclarer declarer) { 57 //給values中的數據起個名字,方便后面的bolt從這個values中取數據 58 //fields中定義的參數和values中傳遞的數值是一一對應的 59 declarer.declare(new Fields("num")); 60 } 61 62 } 63 64 65 /** 66 * 自定義bolt需要實現baserichbolt 67 * @author Administrator 68 * 69 */ 70 public static class MyBolt extends BaseRichBolt{ 71 private Map stormConf; 72 private TopologyContext context; 73 private OutputCollector collector; 74 75 /** 76 * 和spout中的open方法意義一樣 77 */ 78 @Override 79 public void prepare(Map stormConf, TopologyContext context, 80 OutputCollector collector) { 81 this.stormConf = stormConf; 82 this.context = context; 83 this.collector = collector; 84 } 85 86 int sum = 0; 87 /** 88 * 是bolt中最重要的方法,當spout發射一個tuple出來,execute也會被調用,需要對spout發射出來的tuple進行處理 89 */ 90 @Override 91 public void execute(Tuple input) { 92 if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){ 93 //如果滿足,就說明這個tuple是系統幾倍的組件發送的,也就意味着定時時間到了 94 System.out.println("定時任務執行了。"); 95 96 }else{ 97 //input.getInteger(0);//也可以根據角標獲取tuple中的數據 98 Integer value = input.getIntegerByField("num"); 99 sum+=value; 100 System.out.println("和:"+sum); 101 } 102 103 } 104 105 /** 106 * 聲明輸出字段 107 */ 108 @Override 109 public void declareOutputFields(OutputFieldsDeclarer declarer) { 110 //在這沒必要定義了,因為execute方法中沒有向外發射tuple,所以就不需要聲明了。 111 //如果nextTuple或者execute方法中向外發射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明 112 } 113 114 /** 115 * 局部定時任務 116 * 只針對當前的bolt 對其他的bolt中沒有影響 117 * 加對系統級別tuple的判斷只需要在當前bolt中判斷就可以...其他bolt不需要.. 118 * 這種在工作中最常用.... 119 * 全局定時任務在 main方法中 設置 局部的定時任務只需要在Bolt類中覆蓋getComponentConfiguration()方法 120 * 這個還是比較有用,有意思的 121 */ 122 @Override 123 public Map<String, Object> getComponentConfiguration() { 124 HashMap<String, Object> hashMap = new HashMap<String, Object>(); 125 hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5); 126 return hashMap; 127 } 128 } 129 /** 130 * 注意:在組裝topology的時候,組件的id在定義的時候,名稱不能以__開頭。__是系統保留的 131 * @param args 132 */ 133 public static void main(String[] args) { 134 //組裝topology 135 TopologyBuilder topologyBuilder = new TopologyBuilder(); 136 topologyBuilder.setSpout("spout1", new MySpout()); 137 //.shuffleGrouping("spout1"); 表示讓MyBolt接收MySpout發射出來的tuple 138 topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1"); 139 140 //創建本地storm集群 141 LocalCluster localCluster = new LocalCluster(); 142 Config config = new Config(); 143 localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology()); 144 145 } 146 147 148 }