Storm的另一種常見模式是對流式數據進行所謂“streaming top N”的計算,它的特點是持續的在內存中按照某個統計指標(如出現次數)計算TOP N,然后每隔一定時間間隔輸出實時計算后的TOP N結果。
流式數據的TOP N計算的應用場景很多,例如計算twitter上最近一段時間內的熱門話題、熱門點擊圖片等等。
下面結合Storm-Starter中的例子,介紹一種可以很容易進行擴展的實現方法:首先,在多台機器上並行的運行多個Bolt,每個Bolt負責一部分數據的TOP N計算,然后再有一個全局的Bolt來合並這些機器上計算出來的TOP N結果,合並后得到最終全局的TOP N結果。
該部分示例代碼的入口是RollingTopWords類,用於計算文檔中出現次數最多的N個單詞。首先看一下這個Topology結構:
Topology構建的代碼如下:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word", new TestWordSpout(), 5); builder.setBolt("count", new RollingCountObjects(60, 10), 4) .fieldsGrouping("word", new Fields("word")); builder.setBolt("rank", new RankObjects(TOP_N), 4) .fieldsGrouping("count", new Fields("obj")); builder.setBolt("merge", new MergeObjects(TOP_N)) .globalGrouping("rank");
(1)首先,TestWordSpout()是Topology的數據源Spout,持續隨機生成單詞發出去,產生數據流“word”,輸出Fields是“word”,核心代碼如下:
public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }
(2)接下來,“word”流入RollingCountObjects這個Bolt中進行word count計算,為了保證同一個word的數據被發送到同一個Bolt中進行處理,按照“word”字段進行field grouping;在RollingCountObjects中會計算各個word的出現次數,然后產生“count”流,輸出“obj”和“count”兩個Field,核心代碼如下:
public void execute(Tuple tuple) { Object obj = tuple.getValue(0); int bucket = currentBucket(_numBuckets); synchronized(_objectCounts) { long[] curr = _objectCounts.get(obj); if(curr==null) { curr = new long[_numBuckets]; _objectCounts.put(obj, curr); } curr[bucket]++; _collector.emit(new Values(obj, totalObjects(obj))); _collector.ack(tuple); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("obj", "count")); }
(3)然后,RankObjects這個Bolt按照“count”流的“obj”字段進行field grouping;在Bolt內維護TOP N個有序的單詞,如果超過TOP N個單詞,則將排在最后的單詞踢掉,同時每個一定時間(2秒)產生“rank”流,輸出“list”字段,輸出TOP N計算結果到下一級數據流“merge”流,核心代碼如下:
public void execute(Tuple tuple, BasicOutputCollector collector) { Object tag = tuple.getValue(0); Integer existingIndex = _find(tag); if (null != existingIndex) { _rankings.set(existingIndex, tuple.getValues()); } else { _rankings.add(tuple.getValues()); } Collections.sort(_rankings, new Comparator<List>() { public int compare(List o1, List o2) { return _compare(o1, o2); } }); if (_rankings.size() > _count) { _rankings.remove(_count); } long currentTime = System.currentTimeMillis(); if(_lastTime==null || currentTime >= _lastTime + 2000) { collector.emit(new Values(new ArrayList(_rankings))); _lastTime = currentTime; } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("list")); }
(4)最后,MergeObjects這個Bolt按照“rank”流的進行全局的grouping,即所有上一級Bolt產生的“rank”流都流到這個“merge”流進行;MergeObjects的計算邏輯和RankObjects類似,只是將各個RankObjects的Bolt合並后計算得到最終全局的TOP N結果,核心代碼如下:
public void execute(Tuple tuple, BasicOutputCollector collector) { List<List> merging = (List) tuple.getValue(0); for(List pair : merging) { Integer existingIndex = _find(pair.get(0)); if (null != existingIndex) { _rankings.set(existingIndex, pair); } else { _rankings.add(pair); } Collections.sort(_rankings, new Comparator<List>() { public int compare(List o1, List o2) { return _compare(o1, o2); } }); if (_rankings.size() > _count) { _rankings.subList(_count, _rankings.size()).clear(); } } long currentTime = System.currentTimeMillis(); if(_lastTime==null || currentTime >= _lastTime + 2000) { collector.emit(new Values(new ArrayList(_rankings))); LOG.info("Rankings: " + _rankings); _lastTime = currentTime; } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("list")); }
關於上述例子的幾點說明:
(1) 為什么要有RankObjects和MergeObjects兩級的Bolt來計算呢?
其實,計算TOP N的一個最簡單的思路是直接使用一個Bolt(通過類似於RankObjects的類實現)來做全局的求TOP N操作。
但是,這種方式的明顯缺點在於受限於單台機器的處理能力。
(2) 如何保證計算結果的正確性?
首先通過field grouping將同一個word的計算放到同一個Bolt上處理;最后有一個全局的global grouping匯總得到TOP N。
這樣可以做到最大可能並行性,同時也能保證計算結果的正確。
(3) 如果當前計算資源無法滿足計算TOP N,該怎么辦?
這個問題本質上就是系統的可擴展性問題,基本的解決方法就是盡可能做到在多個機器上的並行計算過程,針對上面的Topology結構:
a) 可以通過增加每一級處理單元Bolt的數量,減少每個Bolt處理的數據規模;
b) 可以通過增加一級或多級Bolt處理單元,減少最終匯總處理的數據規模。