滑動窗口在監控和統計應用的場景比較廣泛,比如每隔一段時間(10s)統計最近30s的請求量或者異常次數,根據請求或者異常次數采取相應措施。在storm1.0版本之前,沒有提供關於滑動窗口的實現,需要開發者自己實現滑動窗口的功能(storm1.0以前實現滑動窗口的實現原理可以自行百度)。
這里主要演示在storm1.0以后如何通過繼承storm1.0提供的類來快速開發出窗口滑動的功能。窗口可以從時間或數量上來划分,由如下兩個因素決定:窗口的長度,可以是時間間隔或Tuple數量;滑動間隔(sliding Interval),可以是時間間隔或Tuple數量。比如:每兩秒統計最近6秒的請求數量;每接收2個Tuple就統計最近接收的6個Tuple的平均值......。
storm1.0支持的時間和數量的排列組合有如下:
withWindow(Count windowLength, Count slidingInterval)
每收到slidingInterval條數據統計最近的windowLength條數據。
withWindow(Count windowLength)
每收到1條數據統計最近的windowLength條數據。
withWindow(Count windowLength, Duration slidingInterval)
每過slidingInterval秒統計最近的windowLength條數據。
withWindow(Duration windowLength, Count slidingInterval)
每收到slidingInterval條數據統計最近的windowLength秒的數據。
withWindow(Duration windowLength, Duration slidingInterval)
每過slidingInterval秒統計最近的windowLength秒的數據。
public withWindow(Duration windowLength)
每收到1條數據統計最近的windowLength秒的數據。
接下來,簡單的演示如何使用storm1.0實現滑動窗口的功能,先編寫spout類,RandomSentenceSpout負責發送一個整形數值,數值每次發送都會自動加一,且RandomSentenceSpout固定每隔兩秒向bolt發送一次數據。RandomSentenceSpout和前面關於spout的講解一樣。
1.public class RandomSentenceSpout extends BaseRichSpout { 2. 3. private static final long serialVersionUID = 5028304756439810609L; 4. 5. private SpoutOutputCollector collector; 6. 7. int intsmaze=0; 8. 9. public void declareOutputFields(OutputFieldsDeclarer declarer) { 10. declarer.declare(new Fields("intsmaze")); 11. } 12. 13. public void open(Map conf, TopologyContext context, 14. SpoutOutputCollector collector) { 15. this.collector = collector; 16. } 17. 18. public void nextTuple() { 19. System.out.println("發送數據:"+intsmaze); 20. collector.emit(new Values(intsmaze++)); 21. try { 22. Thread.sleep(2000); 23.// Thread.sleep(1000); 24. } catch (InterruptedException e) { 25. e.printStackTrace(); 26. } 27. } }
滑動窗口的邏輯實現的重點是bolt類,這里我們編寫SlidingWindowBolt類讓它繼承一個新的類名為BaseWindowedBolt來獲得窗口計數的功能。BaseWindowedBolt和前面的BaseBaseBolt和BaseWindowedBolt提供的方法名都一樣,只是execute方法的參數類型為TupleWindow,TupleWindow參數里面裝載了一個窗口長度類的tuple數據。通過對TupleWindow遍歷,我們可以計算這一個窗口內tuple數的平均值或總和等指標。具體見代碼12-16行,統計了一個窗口內的數值型數據的總和。
1.public class SlidingWindowBolt extends BaseWindowedBolt { 2. 3. private OutputCollector collector; 4. 5. @Override 6. public void prepare(Map stormConf, TopologyContext context, 7. OutputCollector collector) { 8. this.collector = collector; 9. } 10. 11. public void execute(TupleWindow inputWindow) { 12. int sum=0; 13. System.out.print("一個窗口內的數據"); 14. for(Tuple tuple: inputWindow.get()) { 15. int str=(Integer) tuple.getValueByField("intsmaze"); 16. System.out.print(" "+str); 17. sum+=str; 18. } 19. System.out.println("======="+sum); 20. // collector.emit(new Values(sum)); 21. } 22. 23. @Override 24. public void declareOutputFields(OutputFieldsDeclarer declarer) { 25.// declarer.declare(new Fields("count")); 26. } }
我們已經實現了窗口計數的邏輯代碼,現在我們需要提供topology來指明各個組件的關系,以及指定SlidingWindowBolt的窗口的組合,這里我們演示了如何每兩秒統計最近6秒的數值總和,如果注釋掉10-13行代碼,去掉5-8行的注釋,這個topology就是告訴SlidingWindowBolt每接收到兩條tuple就統計最近接收到的6條tuple的數值的總和。
1.public class WindowsTopology { 2. 3. public static void main(String[] args) throws Exception { 4. TopologyBuilder builder = new TopologyBuilder(); 5. builder.setSpout("spout1", new RandomSentenceSpout(), 1); 6.// builder.setBolt("slidingwindowbolt", new SlidingWindowBolt() 7.// .withWindow(new Count(6), new Count(2)),1) 8.// .shuffleGrouping("spout"); 9.//滑窗 窗口長度:tuple數, 滑動間隔: tuple數 每收到2條數據統計當前6條數據的總和。 10. 11. builder.setBolt("slidingwindowbolt", new SlidingWindowBolt() 12. .withWindow(new Duration(6, TimeUnit.SECONDS), 13. new Duration(2, TimeUnit.SECONDS)),1) 14. .shuffleGrouping("spout");//每兩秒統計最近6秒的數據 15. 16. Config conf = new Config(); 17. conf.setNumWorkers(1); 18. LocalCluster cluster = new LocalCluster(); 19. cluster.submitTopology("word-count", conf, builder.createTopology()); 20. } }
這里演示的是bolt節點並發度為1的窗口功能,實際生產中,因為數據量很大,往往將bolt節點的並發度設置為多個,這個時候我們的SlidingWindowBolt就無法統計出一個窗口的數值總和了。因為每一個bolt的並行節點只能統計自己一個窗口接收到數據的總和,無法統計出一個窗口內全局數據的總和,借助redis來實現是可以的,但是必須引入redis的事務機制或者借助分布式鎖,否則會出現臟數據的情況。在這里我們介紹另一種實現方式就是靈活的使用storm提供的窗口功能,只是窗口的tuple數。
仍然是使用上面提供的類,只是我們增加一個bolt類,來統計每個SlidingWindowBolt節點發送給它的數值。
1.public class CountWord extends BaseWindowedBolt{ 2. 3. private static final long serialVersionUID = -5283595260540124273L; 4. 5. private OutputCollector collector; 6. 7. public void prepare(Map stormConf, TopologyContext context 8. , OutputCollector collector) { 9. this.collector = collector; 10. } 11. 12. public void execute(TupleWindow inputWindow) { 13. int sum=0; 14. for(Tuple tuple: inputWindow.get()) { 15. int i=(Integer) tuple.getValueByField("count"); 16. System.out.println("接收到一個bolt的總和值為:"+i); 17. sum+=i; 18. } 19. System.out.println("一個窗口內的總值為:"+sum); 20. } 21. 22. public void declareOutputFields(OutputFieldsDeclarer declarer) { 23. } }
然后我們注釋RandomSentenceSpout第22行代碼,取消對23行代碼的注釋,方便觀察結果。去掉SlidingWindowBolt類20和25行代碼。
topology啟動類如下:
1.public class WindowsTopology { 2. 3. public static void main(String[] args) throws Exception { 4. TopologyBuilder builder = new TopologyBuilder(); 5. builder.setSpout("spout", new RandomSentenceSpout(), 1); 6. 7. builder.setBolt("slidingwindowbolt", new SlidingWindowBolt() 8. .withWindow(new Duration(6, TimeUnit.SECONDS), 9. new Duration(2, TimeUnit.SECONDS)),2) 10. .shuffleGrouping("spout");//每兩秒統計最近6秒的數據 11. 12. builder.setBolt("countwordbolt", new CountWord() 13. .withWindow(new Count(2), new Count(2)),1) 14. .shuffleGrouping("slidingwindowbolt"); 15. //每收到2條tuple就統計最近兩條統的數據 16. Config conf = new Config(); 17. conf.setNumWorkers(1); 18. LocalCluster cluster = new LocalCluster(); 19. cluster.submitTopology("word-count", conf, builder.createTopology()); 20. } }