Storm里面fieldsGrouping和Field的概念詳解


這個Field通常和fieldsGrouping分組機制一起使用,這個Field特別難理解,我自己也是在網上看了好多文章,感覺依舊講的不是很清楚,是似而非,沒有抓到重點。這個問題足足困擾了我3-4天時間,一直理解不了Field的概念,

當前我覺得new Fields("word")就相當於表的表頭,就是定義這個域,這個域里面放的東西,是emit進去的

如果在declareOutputFields方法中new Fields("word1","word2")有2個及以上的fields,則在emit數據時new Value要與其對應(相當於key與value的關系),然后在topology組裝時,fieldsGrouping中的new Fields()可以為new Fields("word1")或new Fields("word2")或new Fields("word1",”word2")來指定接受上游spout或bolt的哪些fields

官方文檔里有這么一句話:“if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always go to the same task”

一個task就是一個處理邏輯的實例,所以fields能根據tuple stream的id,也就是下面定義的xxx
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("xxx"));
}
xxx所代表的具體內容會由某一個task來處理,並且同一個xxx對應的內容,處理這個內容的task實例是同一個。

比如說:

bolt第一次emit三個流,即xxx有luonq pangyang qinnl三個值,假設分別建立三個task實例來處理:

luonq -> instance1
pangyang -> instance2
qinnl -> instance3

然后第二次emit四個流,即xxx有luonq qinnanluo py pangyang四個值,假設還是由剛才的三個task實例來處理:
luonq -> instance1
qinnanluo -> instance2
py -> instance3
pangyang -> instance2

然后第三次emit兩個流,即xxx有py qinnl兩個值,假設還是由剛才的三個task實例來處理:
py -> instance3
qinnl -> instance3

最后我們看看三個task實例都處理了哪些值,分別處理了多少次:

instance1: luonq(處理2次)
instance2: pangyang(處理2次) qinnanluo(處理1次)
instance3: qinnl(處理2次) py(處理2次)

結論:
1. emit發出的值第一次由哪個task實例處理是隨機的,此后再次出現這個值,就固定由最初處理他的那個task實例再次處理,直到topology結束

2. 一個task實例可以處理多個emit發出的值

3. 和shuffle Grouping的區別就在於,shuffle Grouping當emit發出同樣的值時,處理他的task是隨機的


例子1:
第一步:定義了一個表頭
public void declareOutputFields(OutputFieldsDeclarer declarer)
    {
        declarer.declare(new Fields("word"));
    }
第二步:往這個Field空間里面emit進去內容(可以是Bolt和Spolt)
public void execute(Tuple input, BasicOutputCollector collector)
    {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for (String word : words)
        {
            word = word.trim();
            if (!word.isEmpty())
            {
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
    }
第三步:關聯步驟
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
Integer number = 2;
builder.setBolt("word-counter", new WordCounter(), 4).fieldsGrouping("word-normalizer", new Fields("word"));

第四步:
最終實現的結果:
Field:Word
            the
            sporm
            is
            ...

例子2:

第一步:
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
      declarer.declare(new Fields("word", "count"));
}

第二步:
public void execute(Tuple tuple, BasicOutputCollector collector)
 {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null)
                count = 0;
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
}
第三步:
Fields("word", "count")
            “is”,1
            “sporm”,3
            “the”,2
              .....
例子3:
D:\.....\Workspaces\MyEclipse 8.5\bigData\examples-ch06-real-life-app-master\src\main\java\storm\analytics\....
第一步:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("read-feed", new UsersNavigationSpout(), 3);
builder.setBolt("get-categ", new GetCategoryBolt(), 3).shuffleGrouping("read-feed");
builder.setBolt("user-history", new UserHistoryBolt(), 5).fieldsGrouping("get-categ", new Fields("user"));

第二步:發送者輸出是三個結構體:Fields("user","product", "categ")
GetCategoryBolt.java
public void execute(Tuple input, BasicOutputCollector collector)
 {
        NavigationEntry entry = (NavigationEntry)input.getValue(1);
        if("PRODUCT".equals(entry.getPageType())){
            try {
                String product = (String)entry.getOtherData().get("product");

                // Call the items API to get item information
                Product itm = reader.readItem(product);
                if(itm ==null)
                    return ;

                String categ = itm.getCategory();

                collector.emit(new Values(entry.getUserId(), product, categ));

            } catch (Exception ex) {
                System.err.println("Error processing PRODUCT tuple"+ ex);
                ex.printStackTrace();
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("user","product", "categ"));
    }

第三步:new Fields("user"))只取Fields("user","product", "categ"))中的User
builder.setBolt("user-history", new UserHistoryBolt(), 5).fieldsGrouping("get-categ", new Fields("user"));
---------------------
作者:VessalasdXZ
來源:CSDN
原文:https://blog.csdn.net/vessalasd1/article/details/50472123
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM