storm 1.0版本滑動窗口的實現及原理


滑動窗口在監控和統計應用的場景比較廣泛,比如每隔一段時間(10s)統計最近30s的請求量或者異常次數,根據請求或者異常次數采取相應措施。在storm1.0版本之前,沒有提供關於滑動窗口的實現,需要開發者自己實現滑動窗口的功能(storm1.0以前實現滑動窗口的實現原理可以自行百度)。

原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6481588.html

微信:intsmaze

這里主要演示在storm1.0以后如何通過繼承storm1.0提供的類來快速開發出窗口滑動的功能。窗口可以從時間或數量上來划分,由如下兩個因素決定:窗口的長度,可以是時間間隔或Tuple數量;滑動間隔(sliding Interval),可以是時間間隔或Tuple數量。比如:每兩秒統計最近6秒的請求數量;每接收2個Tuple就統計最近接收的6個Tuple的平均值......。

storm1.0支持的時間和數量的排列組合有如下:

withWindow(Count windowLength, Count slidingInterval)

  每收到slidingInterval條數據統計最近的windowLength條數據。

withWindow(Count windowLength)

  每收到1條數據統計最近的windowLength條數據。

withWindow(Count windowLength, Duration slidingInterval)

  每過slidingInterval秒統計最近的windowLength條數據。

withWindow(Duration windowLength, Count slidingInterval)

  每收到slidingInterval條數據統計最近的windowLength秒的數據。

withWindow(Duration windowLength, Duration slidingInterval)

  每過slidingInterval秒統計最近的windowLength秒的數據。

public withWindow(Duration windowLength)

  每收到1條數據統計最近的windowLength秒的數據。

接下來,簡單的演示如何使用storm1.0實現滑動窗口的功能,先編寫spout類,RandomSentenceSpout負責發送一個整形數值,數值每次發送都會自動加一,且RandomSentenceSpout固定每隔兩秒向bolt發送一次數據。RandomSentenceSpout和前面關於spout的講解一樣。

1.public class RandomSentenceSpout extends BaseRichSpout {
2.
3.    private static final long serialVersionUID = 5028304756439810609L;  
4.
5.    private SpoutOutputCollector collector;  
6.
7.    int intsmaze=0;
8.
9.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
10.        declarer.declare(new Fields("intsmaze"));
11.    }
12.
13.    public void open(Map conf, TopologyContext context, 
14.                          SpoutOutputCollector collector) {
15.        this.collector = collector;
16.    }
17.
18.    public void nextTuple() {
19.        System.out.println("發送數據:"+intsmaze);
20.        collector.emit(new Values(intsmaze++));
21.        try {
22.            Thread.sleep(2000);
23.//         Thread.sleep(1000);
24.        } catch (InterruptedException e) {
25.            e.printStackTrace();
26.        }
27.    }
}

滑動窗口的邏輯實現的重點是bolt類,這里我們編寫SlidingWindowBolt類讓它繼承一個新的類名為BaseWindowedBolt來獲得窗口計數的功能。BaseWindowedBolt和前面的BaseBaseBoltBaseWindowedBolt提供的方法名都一樣,只是execute方法的參數類型為TupleWindow,TupleWindow參數里面裝載了一個窗口長度類的tuple數據。通過對TupleWindow遍歷,我們可以計算這一個窗口內tuple數的平均值或總和等指標。具體見代碼12-16行,統計了一個窗口內的數值型數據的總和。

1.public class SlidingWindowBolt extends BaseWindowedBolt {
2.
3.    private OutputCollector collector;
4.
5.    @Override
6.    public void prepare(Map stormConf, TopologyContext context, 
7.            OutputCollector collector) {
8.        this.collector = collector;
9.    }
10.
11.    public void execute(TupleWindow inputWindow) {        
12.        int sum=0;
13.        System.out.print("一個窗口內的數據");
14.        for(Tuple tuple: inputWindow.get()) {
15.            int str=(Integer) tuple.getValueByField("intsmaze");
16.            System.out.print(" "+str);
17.            sum+=str;
18.        }
19.        System.out.println("======="+sum);
20. //        collector.emit(new Values(sum));
21.    }
22.
23.    @Override
24.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
25.//       declarer.declare(new Fields("count"));
26.    }
}

我們已經實現了窗口計數的邏輯代碼,現在我們需要提供topology來指明各個組件的關系,以及指定SlidingWindowBolt的窗口的組合,這里我們演示了如何每兩秒統計最近6秒的數值總和,如果注釋掉10-13行代碼,去掉5-8行的注釋,這個topology就是告訴SlidingWindowBolt每接收到兩條tuple就統計最近接收到的6條tuple的數值的總和。

1.public class WindowsTopology {
2.
3.    public static void main(String[] args) throws Exception {
4.       TopologyBuilder builder = new TopologyBuilder();
5.       builder.setSpout("spout1", new RandomSentenceSpout(), 1);
6.//       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt()
7.//       .withWindow(new Count(6), new Count(2)),1)
8.//       .shuffleGrouping("spout");
9.//滑窗 窗口長度:tuple數, 滑動間隔: tuple數 每收到2條數據統計當前6條數據的總和。  
10.     
11.       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt()
12.       .withWindow(new Duration(6, TimeUnit.SECONDS), 
13.               new Duration(2, TimeUnit.SECONDS)),1)
14.       .shuffleGrouping("spout");//每兩秒統計最近6秒的數據       
15.
16.       Config conf = new Config();
17.       conf.setNumWorkers(1);
18.       LocalCluster cluster = new LocalCluster();
19.       cluster.submitTopology("word-count", conf, builder.createTopology());
20.   }
}

這里演示的是bolt節點並發度為1的窗口功能,實際生產中,因為數據量很大,往往將bolt節點的並發度設置為多個,這個時候我們的SlidingWindowBolt就無法統計出一個窗口的數值總和了。因為每一個bolt的並行節點只能統計自己一個窗口接收到數據的總和,無法統計出一個窗口內全局數據的總和,借助redis來實現是可以的,但是必須引入redis的事務機制或者借助分布式鎖,否則會出現臟數據的情況。在這里我們介紹另一種實現方式就是靈活的使用storm提供的窗口功能,只是窗口的tuple數。

仍然是使用上面提供的類,只是我們增加一個bolt類,來統計每個SlidingWindowBolt節點發送給它的數值。

1.public class CountWord extends BaseWindowedBolt{
2.    
3.    private static final long serialVersionUID = -5283595260540124273L;
4.    
5.    private OutputCollector collector;
6.    
7.    public void prepare(Map stormConf, TopologyContext context
8.                             , OutputCollector collector) {
9.        this.collector = collector;
10.    }
11.    
12.    public void execute(TupleWindow inputWindow) {
13.         int sum=0;
14.         for(Tuple tuple: inputWindow.get()) {
15.             int i=(Integer) tuple.getValueByField("count");
16.               System.out.println("接收到一個bolt的總和值為:"+i);
17.               sum+=i;
18.          }
19.         System.out.println("一個窗口內的總值為:"+sum);
20.    }
21.
22.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
23.    }
}

然后我們注釋RandomSentenceSpout22行代碼,取消對23行代碼的注釋,方便觀察結果。去掉SlidingWindowBolt20和25行代碼。

topology啟動類如下:

1.public class WindowsTopology {
2.
3.    public static void main(String[] args) throws Exception {
4.       TopologyBuilder builder = new TopologyBuilder();
5.       builder.setSpout("spout", new RandomSentenceSpout(), 1);
6.       
7.       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt()
8.       .withWindow(new Duration(6, TimeUnit.SECONDS), 
9.               new Duration(2, TimeUnit.SECONDS)),2)
10.       .shuffleGrouping("spout");//每兩秒統計最近6秒的數據
11.       
12.       builder.setBolt("countwordbolt", new CountWord()
13.       .withWindow(new Count(2), new Count(2)),1)
14.       .shuffleGrouping("slidingwindowbolt");
15.       //每收到2條tuple就統計最近兩條統的數據
16.       Config conf = new Config();
17.       conf.setNumWorkers(1);
18.       LocalCluster cluster = new LocalCluster();
19.       cluster.submitTopology("word-count", conf, builder.createTopology());
20.   }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM