剛剛接觸storm 對於滑動窗口的topN復雜模型有一些不理解,通過閱讀其他的博客發現有兩篇關於topN的非滑動窗口的介紹。然后轉載過來。
下面是第一種:
Storm的另一種常見模式是對流式數據進行所謂“streaming top N”的計算,它的特點是持續的在內存中按照某個統計指標(如出現次數)計算TOP N,然后每隔一定時間間隔輸出實時計算后的TOP N結果。
流式數據的TOP N計算的應用場景很多,例如計算twitter上最近一段時間內的熱門話題、熱門點擊圖片等等。
下面結合Storm-Starter中的例子,介紹一種可以很容易進行擴展的實現方法:首先,在多台機器上並行的運行多個Bolt,每個Bolt負責一部分數據的TOP N計算,然后再有一個全局的Bolt來合並這些機器上計算出來的TOP N結果,合並后得到最終全局的TOP N結果。
該部分示例代碼的入口是RollingTopWords類,用於計算文檔中出現次數最多的N個單詞。首先看一下這個Topology結構:
Topology構建的代碼如下:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word", new TestWordSpout(), 5); builder.setBolt("count", new RollingCountObjects(60, 10), 4) .fieldsGrouping("word", new Fields("word")); builder.setBolt("rank", new RankObjects(TOP_N), 4) .fieldsGrouping("count", new Fields("obj")); builder.setBolt("merge", new MergeObjects(TOP_N)) .globalGrouping("rank");
(1)首先,TestWordSpout()是Topology的數據源Spout,持續隨機生成單詞發出去,產生數據流“word”,輸出Fields是“word”,核心代碼如下:
public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }
(2)接下來,“word”流入RollingCountObjects這個Bolt中進行word count計算,為了保證同一個word的數據被發送到同一個Bolt中進行處理,按照“word”字段進行field grouping;在RollingCountObjects中會計算各個word的出現次數,然后產生“count”流,輸出“obj”和“count”兩個Field,其中對於synchronized的線程鎖我們也可以換成安全的容器,比如ConcurrentHashMap等組件。核心代碼如下:
public void execute(Tuple tuple) { Object obj = tuple.getValue(0); int bucket = currentBucket(_numBuckets); synchronized(_objectCounts) { long[] curr = _objectCounts.get(obj); if(curr==null) { curr = new long[_numBuckets]; _objectCounts.put(obj, curr); } curr[bucket]++; _collector.emit(new Values(obj, totalObjects(obj))); _collector.ack(tuple); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("obj", "count")); }
(3)然后,RankObjects這個Bolt按照“count”流的“obj”字段進行field grouping;在Bolt內維護TOP N個有序的單詞,如果超過TOP N個單詞,則將排在最后的單詞踢掉,同時每個一定時間(2秒)產生“rank”流,輸出“list”字段,輸出TOP N計算結果到下一級數據流“merge”流,核心代碼如下:
public void execute(Tuple tuple, BasicOutputCollector collector) { Object tag = tuple.getValue(0); Integer existingIndex = _find(tag); if (null != existingIndex) { _rankings.set(existingIndex, tuple.getValues()); } else { _rankings.add(tuple.getValues()); } Collections.sort(_rankings, new Comparator<List>() { public int compare(List o1, List o2) { return _compare(o1, o2); } }); if (_rankings.size() > _count) { _rankings.remove(_count); } long currentTime = System.currentTimeMillis(); if(_lastTime==null || currentTime >= _lastTime + 2000) { collector.emit(new Values(new ArrayList(_rankings))); _lastTime = currentTime; } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("list")); }
(4)最后,MergeObjects這個Bolt按照“rank”流的進行全局的grouping,即所有上一級Bolt產生的“rank”流都流到這個“merge”流進行;MergeObjects的計算邏輯和RankObjects類似,只是將各個RankObjects的Bolt合並后計算得到最終全局的TOP N結果,核心代碼如下:
public void execute(Tuple tuple, BasicOutputCollector collector) { List<List> merging = (List) tuple.getValue(0); for(List pair : merging) { Integer existingIndex = _find(pair.get(0)); if (null != existingIndex) { _rankings.set(existingIndex, pair); } else { _rankings.add(pair); } Collections.sort(_rankings, new Comparator<List>() { public int compare(List o1, List o2) { return _compare(o1, o2); } }); if (_rankings.size() > _count) { _rankings.subList(_count, _rankings.size()).clear(); } } long currentTime = System.currentTimeMillis(); if(_lastTime==null || currentTime >= _lastTime + 2000) { collector.emit(new Values(new ArrayList(_rankings))); LOG.info("Rankings: " + _rankings); _lastTime = currentTime; } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("list")); }
另外,還有一種很聰明的方法,只在execute中插入數據而不emit,而在prepare中進行emit,創建線程根據時間進行監聽。
- package test.storm.topology;
- import test.storm.bolt.WordCounter;
- import test.storm.bolt.WordWriter;
- import test.storm.spout.WordReader;
- import backtype.storm.Config;
- import backtype.storm.StormSubmitter;
- import backtype.storm.generated.AlreadyAliveException;
- import backtype.storm.generated.InvalidTopologyException;
- import backtype.storm.topology.TopologyBuilder;
- import backtype.storm.tuple.Fields;
- public class WordTopN {
- public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
- if (args == null || args.length < 1) {
- System.err.println("Usage: N");
- System.err.println("such as : 10");
- System.exit(-1);
- }
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("wordreader", new WordReader(), 2);
- builder.setBolt("wordcounter", new WordCounter(), 2).fieldsGrouping("wordreader", new Fields("word"));
- builder.setBolt("wordwriter", new WordWriter()).globalGrouping("wordcounter");
- Config conf = new Config();
- conf.put("N", args[0]);
- conf.setDebug(false);
- StormSubmitter.submitTopology("topN", conf, builder.createTopology());
- }
- }
這里需要注意的幾點是,第一個bolt的分組策略是fieldsGrouping,按照字段分組,這一點很重要,它能保證相同的word被分發到同一個bolt上,
像做wordcount、TopN之類的應用就要使用這種分組策略。
最后一個bolt的分組策略是globalGrouping,全局分組,tuple會被分配到一個bolt用來匯總。
為了提高並行度,spout和第一個bolt均設置並行度為2(我這里測試機器性能不是很高)。
點擊(此處)折疊或打開
- package test.storm.spout;
- import java.util.Map;
- import java.util.Random;
- import java.util.concurrent.atomic.AtomicInteger;
- import backtype.storm.spout.SpoutOutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseRichSpout;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Values;
- public class WordReader extends BaseRichSpout {
- private static final long serialVersionUID = 2197521792014017918L;
- private SpoutOutputCollector collector;
- private static AtomicInteger i = new AtomicInteger();
- private static String[] words = new String[] { \"a\", \"b\", \"c\", \"d\", \"e\", \"f\", \"g\", \"h\", \"i\", \"j\", \"k\", \"l\", \"m\",
- \"n\", \"o\", \"p\", \"q\", \"r\", \"s\", \"t\", \"u\", \"v\", \"w\", \"x\", \"y\", \"z\" };
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- this.collector = collector;
- }
- @Override
- public void nextTuple() {
- if (i.intValue() < 100) {
- Random rand = new Random();
- String word = words[rand.nextInt(words.length)];
- collector.emit(new Values(word));
- i.incrementAndGet();
- }
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
- }
spout的作用是隨機發送word,發送100次,由於並行度是2,將產生2個spout實例,所以這里的計數器使用了static的AtomicInteger來保證線程安全。
點擊(此處)折疊或打開
- package test.storm.bolt;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Comparator;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Map.Entry;
- import java.util.concurrent.ConcurrentHashMap;
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.IRichBolt;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Values;
- public class WordCounter implements IRichBolt {
- private static final long serialVersionUID = 5683648523524179434L;
- private static Map<String, Integer> counters = new ConcurrentHashMap<String, Integer>();
- private volatile boolean edit = true;
- @Override
- public void prepare(final Map stormConf, TopologyContext context, final OutputCollector collector) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- while (true) {
- //5秒后counter不再變化,可以認為spout已經發送完畢
- if (!edit) {
- if (counters.size() > 0) {
- List<Map.Entry<String, Integer>> list = new ArrayList<Map.Entry<String, Integer>>();
- list.addAll(counters.entrySet());
- Collections.sort(list, new ValueComparator());
- //向下一個bolt發送前N個word
- for (int i = 0; i < list.size(); i++) {
- if (i < Integer.parseInt(stormConf.get("N").toString())) {
- collector.emit(new Values(list.get(i).getKey() + ":" + list.get(i).getValue()));
- }
- }
- }
- //發送之后,清空counters,以防spout再次發送word過來
- counters.clear();
- }
- edit = false;
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }).start();
- }
- @Override
- public void execute(Tuple tuple) {
- String str = tuple.getString(0);
- if (counters.containsKey(str)) {
- Integer c = counters.get(str) + 1;
- counters.put(str, c);
- } else {
- counters.put(str, 1);
- }
- edit = true;
- }
- private static class ValueComparator implements Comparator<Map.Entry<String, Integer>> {
- @Override
- public int compare(Entry<String, Integer> entry1, Entry<String, Integer> entry2) {
- return entry2.getValue() - entry1.getValue();
- }
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word_count"));
- }
- @Override
- public void cleanup() {
- }
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
- }
在WordCounter里面有個線程安全的容器ConcurrentHashMap,來存儲word以及對應的次數。在prepare方法里啟動一個線程,長期監聽edit的狀態,監聽間隔是5秒,
當edit為false,即execute方法不再執行、容器不再變化,可以認為spout已經發送完畢了,可以開始排序取TopN了。這里使用了一個volatile edit(回憶一下volatile的使用場景:
對變量的修改不依賴變量當前的值,這里設置true or false,顯然不相互依賴)。
點擊(此處)折疊或打開
- package test.storm.bolt;
- import java.io.FileWriter;
- import java.io.IOException;
- import java.util.Map;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.BasicOutputCollector;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseBasicBolt;
- import backtype.storm.tuple.Tuple;
- public class WordWriter extends BaseBasicBolt {
- private static final long serialVersionUID = -6586283337287975719L;
- private FileWriter writer = null;
- public WordWriter() {
- }
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
- try {
- writer = new FileWriter("/data/tianzhen/output/" + this);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void execute(Tuple input, BasicOutputCollector collector) {
- String s = input.getString(0);
- try {
- writer.write(s);
- writer.write("\n");
- writer.flush();
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- //writer不能close,因為execute需要一直運行
- }
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
- }
最后一個bolt做全局的匯總,這里我偷了懶,直接將結果寫到文件了,省略截取TopN的過程,因為我這里就一個supervisor節點,所以結果是正確的。
引用連接:http://blog.itpub.net/28912557/viewspace-1579860/
http://www.cnblogs.com/panfeng412/archive/2012/06/16/storm-common-patterns-of-streaming-top-n.html