storm定時任務【tick】


一. 簡介
     storm作為流計算,處理數據通常以數據驅動。即只有當spout發射數據才會進行計算。那么如果想要做定時任務如何處理哪,例如有的bolt需要輸出一段時間統計的結果,這里一段時間可能是幾秒、幾分鍾或者幾小時。如果還是以數據進行驅動的話必然會輸出時間不可確定。同樣也可以啟一個線程一段時間執行一次,這也是一種解決方案。但是我希望有種更優雅的解決方案,就是這里說的tick。tick是由storm框架進行計時,到達設定時間會發送一個特殊的tuple:ticktuple,此時處理定時任務就可以了。
二. 代碼
     如果是某一個bolt由定時需求的話,可以按照一下方式設置
  1. 繼承BaseBasicBolt
  2. getComponentConfiguration方法設置發送ticktuple間隔時間(單位s)
  3. execute方法判斷tuple類型,如果為ticktuple處理定時任務,如果不是處理其他任務。
以下是wordCount中CountBolt代碼,每5s輸出各單詞統計的數據。
 1 //繼承 BaseBasicBolt
 2 public class CountTickBolt extends BaseBasicBolt {
 3     private static final Logger logger = LoggerFactory.getLogger(CountTickBolt.class);
 4     private Map<String, Integer> count;
 5     private Long time;
 6 
 7     @Override
 8     public Map<String, Object> getComponentConfiguration() {
 9         //設置發送ticktuple的時間間隔
10         Config conf = new Config();
11         conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
12         return conf;
13     }
14 
15     @Override
16     public void prepare(Map stormConf, TopologyContext context) {
17         super.prepare(stormConf, context);
18         count = new HashMap<String, Integer>();
19         time = System.currentTimeMillis();
20     }
21 
22     @Override
23     public void cleanup() {
24         super.cleanup();
25     }
26 
27     @Override
28     public void execute(Tuple input, BasicOutputCollector collector) {
29         //判斷是否為tickTuple
30         if (input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
31                 input.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){
32             //是,處理定時任務
33             Long nowTime = System.currentTimeMillis();
34             Long useTime = nowTime - time;
35             StringBuffer sb = new StringBuffer();
36             sb.append("======== Use Time :" + useTime + "========\r\n");
37             for (Map.Entry wordCount : count.entrySet()){
38                 sb.append(wordCount.getKey() + "------>" + wordCount.getValue() + "\r\n");
39             }
40             Long nnTime = System.currentTimeMillis();
41             logger.info(sb.toString() + (nnTime - nowTime) );
42             time = nnTime;
43         }else {
44             //否,處理其他數據
45             String word = input.getString(0);
46             if (count.containsKey(word)){
47                 int thisWordCont = count.get(word);
48                 count.put(word, ++thisWordCont);
49             }else {
50                 count.put(word,1);
51             }
52         }
53     }
54 
55     @Override
56     public void declareOutputFields(OutputFieldsDeclarer declarer) {
57 
58     }
三. 總結
     以上是一個簡單的介紹,需要說明的是由於設置時間間隔是秒級的,所以在處理時會有毫秒級的誤差,通常是± 2ms。
  以下是沒有介紹或者測試到的地方,在以后會補上。
  1. 如何設置此拓撲中所有的spout和bolt都定時處理。
  2. 由於是tuple類型數據,當正常tuple數據量過大時是否會造成tickTuple延時消費。
  WordCout源碼: https://github.com/youtNa/stormTick


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM