Storm常見模式——求TOP N


Storm的另一種常見模式是對流式數據進行所謂“streaming top N”的計算,它的特點是持續的在內存中按照某個統計指標(如出現次數)計算TOP N,然后每隔一定時間間隔輸出實時計算后的TOP N結果。

流式數據的TOP N計算的應用場景很多,例如計算twitter上最近一段時間內的熱門話題、熱門點擊圖片等等。

下面結合Storm-Starter中的例子,介紹一種可以很容易進行擴展的實現方法:首先,在多台機器上並行的運行多個Bolt,每個Bolt負責一部分數據的TOP N計算,然后再有一個全局的Bolt來合並這些機器上計算出來的TOP N結果,合並后得到最終全局的TOP N結果。

該部分示例代碼的入口是RollingTopWords類,用於計算文檔中出現次數最多的N個單詞。首先看一下這個Topology結構:

Topology構建的代碼如下:

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word", new TestWordSpout(), 5);
        builder.setBolt("count", new RollingCountObjects(60, 10), 4)
                 .fieldsGrouping("word", new Fields("word"));
        builder.setBolt("rank", new RankObjects(TOP_N), 4)
                 .fieldsGrouping("count", new Fields("obj"));
        builder.setBolt("merge", new MergeObjects(TOP_N))
                 .globalGrouping("rank");

1)首先,TestWordSpout()Topology的數據源Spout,持續隨機生成單詞發出去,產生數據流“word”,輸出Fields“word”,核心代碼如下:

    public void nextTuple() {
        Utils.sleep(100);
        final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
  }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
  }

2)接下來,“word”流入RollingCountObjects這個Bolt中進行word count計算,為了保證同一個word的數據被發送到同一個Bolt中進行處理,按照“word”字段進行field grouping;在RollingCountObjects中會計算各個word的出現次數,然后產生“count”流,輸出“obj”“count”兩個Field,核心代碼如下

    public void execute(Tuple tuple) {

        Object obj = tuple.getValue(0);
        int bucket = currentBucket(_numBuckets);
        synchronized(_objectCounts) {
            long[] curr = _objectCounts.get(obj);
            if(curr==null) {
                curr = new long[_numBuckets];
                _objectCounts.put(obj, curr);
            }
            curr[bucket]++;
            _collector.emit(new Values(obj, totalObjects(obj)));
            _collector.ack(tuple);
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("obj", "count"));
    }

3)然后,RankObjects這個Bolt按照“count”流的“obj”字段進行field grouping;在Bolt內維護TOP N個有序的單詞,如果超過TOP N個單詞,則將排在最后的單詞踢掉,同時每個一定時間(2秒)產生“rank”流,輸出“list”字段,輸出TOP N計算結果到下一級數據流“merge”流,核心代碼如下:

    public void execute(Tuple tuple, BasicOutputCollector collector) {
        Object tag = tuple.getValue(0);
        Integer existingIndex = _find(tag);
        if (null != existingIndex) {
            _rankings.set(existingIndex, tuple.getValues());
        } else {
            _rankings.add(tuple.getValues());
        }
        Collections.sort(_rankings, new Comparator<List>() {
            public int compare(List o1, List o2) {
                return _compare(o1, o2);
            }
        });
        if (_rankings.size() > _count) {
            _rankings.remove(_count);
        }
        long currentTime = System.currentTimeMillis();
        if(_lastTime==null || currentTime >= _lastTime + 2000) {
            collector.emit(new Values(new ArrayList(_rankings)));
            _lastTime = currentTime;
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("list"));
    }

4)最后,MergeObjects這個Bolt按照“rank”流的進行全局的grouping,即所有上一級Bolt產生的“rank”流都流到這個“merge”流進行;MergeObjects的計算邏輯和RankObjects類似,只是將各個RankObjectsBolt合並后計算得到最終全局的TOP N結果,核心代碼如下:

    public void execute(Tuple tuple, BasicOutputCollector collector) {
        List<List> merging = (List) tuple.getValue(0);
        for(List pair : merging) {
            Integer existingIndex = _find(pair.get(0));
            if (null != existingIndex) {
                _rankings.set(existingIndex, pair);
            } else {
                _rankings.add(pair);
            }

            Collections.sort(_rankings, new Comparator<List>() {
                public int compare(List o1, List o2) {
                    return _compare(o1, o2);
                }
            });

            if (_rankings.size() > _count) {
                _rankings.subList(_count, _rankings.size()).clear();
            }
        }

        long currentTime = System.currentTimeMillis();
        if(_lastTime==null || currentTime >= _lastTime + 2000) {
            collector.emit(new Values(new ArrayList(_rankings)));
            LOG.info("Rankings: " + _rankings);
            _lastTime = currentTime;
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("list"));
    }

關於上述例子的幾點說明:

(1) 為什么要有RankObjectsMergeObjects兩級的Bolt來計算呢?

其實,計算TOP N的一個最簡單的思路是直接使用一個Bolt(通過類似於RankObjects的類實現)來做全局的求TOP N操作。

但是,這種方式的明顯缺點在於受限於單台機器的處理能力。

(2) 如何保證計算結果的正確性?

首先通過field grouping將同一個word的計算放到同一個Bolt上處理;最后有一個全局的global grouping匯總得到TOP N

這樣可以做到最大可能並行性,同時也能保證計算結果的正確。

(3) 如果當前計算資源無法滿足計算TOP N,該怎么辦?

這個問題本質上就是系統的可擴展性問題,基本的解決方法就是盡可能做到在多個機器上的並行計算過程,針對上面的Topology結構:

a) 可以通過增加每一級處理單元Bolt的數量,減少每個Bolt處理的數據規模;

b) 可以通過增加一級或多級Bolt處理單元,減少最終匯總處理的數據規模。

本文參考代碼見:https://github.com/nathanmarz/storm-starter


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM