Storm中的定時任務


 

1.全局定時器

  1 import java.util.Map;
  2 
  3 import backtype.storm.Config;
  4 import backtype.storm.Constants;
  5 import backtype.storm.LocalCluster;
  6 import backtype.storm.spout.SpoutOutputCollector;
  7 import backtype.storm.task.OutputCollector;
  8 import backtype.storm.task.TopologyContext;
  9 import backtype.storm.topology.OutputFieldsDeclarer;
 10 import backtype.storm.topology.TopologyBuilder;
 11 import backtype.storm.topology.base.BaseRichBolt;
 12 import backtype.storm.topology.base.BaseRichSpout;
 13 import backtype.storm.tuple.Fields;
 14 import backtype.storm.tuple.Tuple;
 15 import backtype.storm.tuple.Values;
 16 import backtype.storm.utils.Utils;
 17 
 18 /**
 19  * 全局定時器
 20  * 
 21  * 數字累加求和
 22  * 先添加storm依賴
 23  * 
 24  * @author Administrator
 25  *
 26  */
 27 public class LocalTopologySumTimer1 {
 28     
 29     
 30     /**
 31      * spout需要繼承baserichspout,實現未實現的方法
 32      * @author Administrator
 33      *
 34      */
 35     public static class MySpout extends BaseRichSpout{
 36         private Map conf;
 37         private TopologyContext context;
 38         private SpoutOutputCollector collector;
 39         
 40         /**
 41          * 初始化方法,只會執行一次
 42          * 在這里面可以寫一個初始化的代碼
 43          * Map conf:其實里面保存的是topology的一些配置信息
 44          * TopologyContext context:topology的上下文,類似於servletcontext
 45          * SpoutOutputCollector collector:發射器,負責向外發射數據(tuple)
 46          */
 47         @Override
 48         public void open(Map conf, TopologyContext context,
 49                 SpoutOutputCollector collector) {
 50             this.conf = conf;
 51             this.context = context;
 52             this.collector = collector;
 53         }
 54 
 55         int num = 1;
 56         /**
 57          * 這個方法是spout中最重要的方法,
 58          * 這個方法會被storm框架循環調用,可以理解為這個方法是在一個while循環之內
 59          * 每調用一次,會向外發射一條數據
 60          */
 61         @Override
 62         public void nextTuple() {
 63             System.out.println("spout發射:"+num);
 64             //把數據封裝到values中,稱為一個tuple,發射出去
 65             this.collector.emit(new Values(num++));
 66             Utils.sleep(1000);
 67         }
 68         
 69         /**
 70          * 聲明輸出字段
 71          */
 72         @Override
 73         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 74             //給values中的數據起個名字,方便后面的bolt從這個values中取數據
 75             //fields中定義的參數和values中傳遞的數值是一一對應的
 76             declarer.declare(new Fields("num"));
 77         }
 78         
 79     }
 80     
 81     
 82     /**
 83      * 自定義bolt需要實現baserichbolt
 84      * @author Administrator
 85      *
 86      */
 87     public static class MyBolt extends BaseRichBolt{
 88         private Map stormConf; 
 89         private TopologyContext context;
 90         private OutputCollector collector;
 91         
 92         /**
 93          * 和spout中的open方法意義一樣
 94          */
 95         @Override
 96         public void prepare(Map stormConf, TopologyContext context,
 97                 OutputCollector collector) {
 98             this.stormConf = stormConf;
 99             this.context = context;
100             this.collector = collector;
101         }
102 
103         int sum = 0;
104         /**
105          * 是bolt中最重要的方法,當spout發射一個tuple出來,execute也會被調用,需要對spout發射出來的tuple進行處理
106          */
107         @Override
108         public void execute(Tuple input) {
109             if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
110                 //如果滿足,就說明這個tuple是系統幾倍的組件發送的,也就意味着定時時間到了
111                 System.out.println("定時任務執行了。");
112                 
113             }else{//這個地方必須要做判斷,否則讓系統級別的tuple去取"num"會取不到報錯的.
114                 //這個地方的邏輯可以將產生的數據封裝成一個map或者是list放在內存中.到達定時任務的時候取出來,使用batch批處理向數據庫中操作.
115                 //然后再把集合中的數據清空...之后再添加.
116                 
117                 //input.getInteger(0);//也可以根據角標獲取tuple中的數據
118                 Integer value = input.getIntegerByField("num");
119                 sum+=value;
120                 System.out.println("和:"+sum);
121             }
122             
123         }
124         
125         /**
126          * 聲明輸出字段
127          */
128         @Override
129         public void declareOutputFields(OutputFieldsDeclarer declarer) {
130             //在這沒必要定義了,因為execute方法中沒有向外發射tuple,所以就不需要聲明了。
131             //如果nextTuple或者execute方法中向外發射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明
132         }
133         
134     }
135     /**
136      * 注意:在組裝topology的時候,組件的id在定義的時候,名稱不能以__開頭。__是系統保留的
137      * @param args
138      */
139     public static void main(String[] args) {
140         //組裝topology
141         TopologyBuilder topologyBuilder = new TopologyBuilder();
142         topologyBuilder.setSpout("spout1", new MySpout());
143         //.shuffleGrouping("spout1"); 表示讓MyBolt接收MySpout發射出來的tuple
144         topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1");
145         
146         //創建本地storm集群
147         LocalCluster localCluster = new LocalCluster();
148         Config config = new Config();
149         //下面這樣設置就是一個全局的定時任務  還有局部的定時任務.
150         config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);//表示每隔5秒storm會給Topology中的所有bolt發射一個系統級別的tuple
151         //前面的單詞計數的例子 我們可能只需要在最后一個CountBolt中做定時任務  SpiltBolt中不需要做定時任務  但是兩個Bolt中都可以收到這個系統級別的tuple
152         //所以需要每個Bolt中都做判斷...SplitBolt可以加上一個判斷  沒有方法體...if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){  }
153         //否則會出錯...從系統級別的tuple取你定義的值 取不到 報錯.
154         localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology());
155         
156     }
157     
158 
159 }

 

 

 

 局部定時器

  1 /**
  2  * 局部定時器
  3  * 
  4  * 數字累加求和
  5  * 先添加storm依賴
  6  * 
  7  * @author Administrator
  8  *
  9  */
 10 public class LocalTopologySumTimer2 {
 11     
 12     
 13     /**
 14      * spout需要繼承baserichspout,實現未實現的方法
 15      * @author Administrator
 16      *
 17      */
 18     public static class MySpout extends BaseRichSpout{
 19         private Map conf;
 20         private TopologyContext context;
 21         private SpoutOutputCollector collector;
 22         
 23         /**
 24          * 初始化方法,只會執行一次
 25          * 在這里面可以寫一個初始化的代碼
 26          * Map conf:其實里面保存的是topology的一些配置信息
 27          * TopologyContext context:topology的上下文,類似於servletcontext
 28          * SpoutOutputCollector collector:發射器,負責向外發射數據(tuple)
 29          */
 30         @Override
 31         public void open(Map conf, TopologyContext context,
 32                 SpoutOutputCollector collector) {
 33             this.conf = conf;
 34             this.context = context;
 35             this.collector = collector;
 36         }
 37 
 38         int num = 1;
 39         /**
 40          * 這個方法是spout中最重要的方法,
 41          * 這個方法會被storm框架循環調用,可以理解為這個方法是在一個while循環之內
 42          * 每調用一次,會向外發射一條數據
 43          */
 44         @Override
 45         public void nextTuple() {
 46             System.out.println("spout發射:"+num);
 47             //把數據封裝到values中,稱為一個tuple,發射出去
 48             this.collector.emit(new Values(num++));
 49             Utils.sleep(1000);
 50         }
 51         
 52         /**
 53          * 聲明輸出字段
 54          */
 55         @Override
 56         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 57             //給values中的數據起個名字,方便后面的bolt從這個values中取數據
 58             //fields中定義的參數和values中傳遞的數值是一一對應的
 59             declarer.declare(new Fields("num"));
 60         }
 61         
 62     }
 63     
 64     
 65     /**
 66      * 自定義bolt需要實現baserichbolt
 67      * @author Administrator
 68      *
 69      */
 70     public static class MyBolt extends BaseRichBolt{
 71         private Map stormConf; 
 72         private TopologyContext context;
 73         private OutputCollector collector;
 74         
 75         /**
 76          * 和spout中的open方法意義一樣
 77          */
 78         @Override
 79         public void prepare(Map stormConf, TopologyContext context,
 80                 OutputCollector collector) {
 81             this.stormConf = stormConf;
 82             this.context = context;
 83             this.collector = collector;
 84         }
 85 
 86         int sum = 0;
 87         /**
 88          * 是bolt中最重要的方法,當spout發射一個tuple出來,execute也會被調用,需要對spout發射出來的tuple進行處理
 89          */
 90         @Override
 91         public void execute(Tuple input) {
 92             if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
 93                 //如果滿足,就說明這個tuple是系統幾倍的組件發送的,也就意味着定時時間到了
 94                 System.out.println("定時任務執行了。");
 95                 
 96             }else{
 97                 //input.getInteger(0);//也可以根據角標獲取tuple中的數據
 98                 Integer value = input.getIntegerByField("num");
 99                 sum+=value;
100                 System.out.println("和:"+sum);
101             }
102             
103         }
104         
105         /**
106          * 聲明輸出字段
107          */
108         @Override
109         public void declareOutputFields(OutputFieldsDeclarer declarer) {
110             //在這沒必要定義了,因為execute方法中沒有向外發射tuple,所以就不需要聲明了。
111             //如果nextTuple或者execute方法中向外發射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明
112         }
113 
114         /**
115          * 局部定時任務  
116          * 只針對當前的bolt  對其他的bolt中沒有影響 
117          * 加對系統級別tuple的判斷只需要在當前bolt中判斷就可以...其他bolt不需要..
118          * 這種在工作中最常用....
119          * 全局定時任務在 main方法中 設置  局部的定時任務只需要在Bolt類中覆蓋getComponentConfiguration()方法
120          * 這個還是比較有用,有意思的
121          */
122         @Override
123         public Map<String, Object> getComponentConfiguration() {
124             HashMap<String, Object> hashMap = new HashMap<String, Object>();
125             hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
126             return hashMap;
127         }
128     }
129     /**
130      * 注意:在組裝topology的時候,組件的id在定義的時候,名稱不能以__開頭。__是系統保留的
131      * @param args
132      */
133     public static void main(String[] args) {
134         //組裝topology
135         TopologyBuilder topologyBuilder = new TopologyBuilder();
136         topologyBuilder.setSpout("spout1", new MySpout());
137         //.shuffleGrouping("spout1"); 表示讓MyBolt接收MySpout發射出來的tuple
138         topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1");
139         
140         //創建本地storm集群
141         LocalCluster localCluster = new LocalCluster();
142         Config config = new Config();
143         localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology());
144         
145     }
146     
147 
148 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM