一. 簡介
storm作為流計算,處理數據通常以數據驅動。即只有當spout發射數據才會進行計算。那么如果想要做定時任務如何處理哪,例如有的bolt需要輸出一段時間統計的結果,這里一段時間可能是幾秒、幾分鍾或者幾小時。如果還是以數據進行驅動的話必然會輸出時間不可確定。同樣也可以啟一個線程一段時間執行一次,這也是一種解決方案。但是我希望有種更優雅的解決方案,就是這里說的tick。tick是由storm框架進行計時,到達設定時間會發送一個特殊的tuple:ticktuple,此時處理定時任務就可以了。
二. 代碼
如果是某一個bolt由定時需求的話,可以按照一下方式設置
- 繼承BaseBasicBolt
- getComponentConfiguration方法設置發送ticktuple間隔時間(單位s)
- execute方法判斷tuple類型,如果為ticktuple處理定時任務,如果不是處理其他任務。
以下是wordCount中CountBolt代碼,每5s輸出各單詞統計的數據。
1 //繼承 BaseBasicBolt 2 public class CountTickBolt extends BaseBasicBolt { 3 private static final Logger logger = LoggerFactory.getLogger(CountTickBolt.class); 4 private Map<String, Integer> count; 5 private Long time; 6 7 @Override 8 public Map<String, Object> getComponentConfiguration() { 9 //設置發送ticktuple的時間間隔 10 Config conf = new Config(); 11 conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5); 12 return conf; 13 } 14 15 @Override 16 public void prepare(Map stormConf, TopologyContext context) { 17 super.prepare(stormConf, context); 18 count = new HashMap<String, Integer>(); 19 time = System.currentTimeMillis(); 20 } 21 22 @Override 23 public void cleanup() { 24 super.cleanup(); 25 } 26 27 @Override 28 public void execute(Tuple input, BasicOutputCollector collector) { 29 //判斷是否為tickTuple 30 if (input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && 31 input.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){ 32 //是,處理定時任務 33 Long nowTime = System.currentTimeMillis(); 34 Long useTime = nowTime - time; 35 StringBuffer sb = new StringBuffer(); 36 sb.append("======== Use Time :" + useTime + "========\r\n"); 37 for (Map.Entry wordCount : count.entrySet()){ 38 sb.append(wordCount.getKey() + "------>" + wordCount.getValue() + "\r\n"); 39 } 40 Long nnTime = System.currentTimeMillis(); 41 logger.info(sb.toString() + (nnTime - nowTime) ); 42 time = nnTime; 43 }else { 44 //否,處理其他數據 45 String word = input.getString(0); 46 if (count.containsKey(word)){ 47 int thisWordCont = count.get(word); 48 count.put(word, ++thisWordCont); 49 }else { 50 count.put(word,1); 51 } 52 } 53 } 54 55 @Override 56 public void declareOutputFields(OutputFieldsDeclarer declarer) { 57 58 }
三. 總結
以上是一個簡單的介紹,需要說明的是由於設置時間間隔是秒級的,所以在處理時會有毫秒級的誤差,通常是± 2ms。
以下是沒有介紹或者測試到的地方,在以后會補上。
- 如何設置此拓撲中所有的spout和bolt都定時處理。
- 由於是tuple類型數據,當正常tuple數據量過大時是否會造成tickTuple延時消費。
WordCout源碼:
https://github.com/youtNa/stormTick