這個Field通常和fieldsGrouping分組機制一起使用,這個Field特別難理解,我自己也是在網上看了好多文章,感覺依舊講的不是很清楚,是似而非,沒有抓到重點。這個問題足足困擾了我3-4天時間,一直理解不了Field的概念,
當前我覺得new Fields("word")就相當於表的表頭,就是定義這個域,這個域里面放的東西,是emit進去的
如果在declareOutputFields方法中new Fields("word1","word2")有2個及以上的fields,則在emit數據時new Value要與其對應(相當於key與value的關系),然后在topology組裝時,fieldsGrouping中的new Fields()可以為new Fields("word1")或new Fields("word2")或new Fields("word1",”word2")來指定接受上游spout或bolt的哪些fields
官方文檔里有這么一句話:“if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always go to the same task”
一個task就是一個處理邏輯的實例,所以fields能根據tuple stream的id,也就是下面定義的xxx
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("xxx"));
}
xxx所代表的具體內容會由某一個task來處理,並且同一個xxx對應的內容,處理這個內容的task實例是同一個。
比如說:
bolt第一次emit三個流,即xxx有luonq pangyang qinnl三個值,假設分別建立三個task實例來處理:
luonq -> instance1
pangyang -> instance2
qinnl -> instance3
然后第二次emit四個流,即xxx有luonq qinnanluo py pangyang四個值,假設還是由剛才的三個task實例來處理:
luonq -> instance1
qinnanluo -> instance2
py -> instance3
pangyang -> instance2
然后第三次emit兩個流,即xxx有py qinnl兩個值,假設還是由剛才的三個task實例來處理:
py -> instance3
qinnl -> instance3
最后我們看看三個task實例都處理了哪些值,分別處理了多少次:
instance1: luonq(處理2次)
instance2: pangyang(處理2次) qinnanluo(處理1次)
instance3: qinnl(處理2次) py(處理2次)
結論:
1. emit發出的值第一次由哪個task實例處理是隨機的,此后再次出現這個值,就固定由最初處理他的那個task實例再次處理,直到topology結束
2. 一個task實例可以處理多個emit發出的值
3. 和shuffle Grouping的區別就在於,shuffle Grouping當emit發出同樣的值時,處理他的task是隨機的
例子1:
第一步:定義了一個表頭
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields("word"));
}
第二步:往這個Field空間里面emit進去內容(可以是Bolt和Spolt)
public void execute(Tuple input, BasicOutputCollector collector)
{
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for (String word : words)
{
word = word.trim();
if (!word.isEmpty())
{
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
}
第三步:關聯步驟
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
Integer number = 2;
builder.setBolt("word-counter", new WordCounter(), 4).fieldsGrouping("word-normalizer", new Fields("word"));
第四步:
最終實現的結果:
Field:Word
the
sporm
is
...
例子2:
第一步:
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields("word", "count"));
}
第二步:
public void execute(Tuple tuple, BasicOutputCollector collector)
{
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
第三步:
Fields("word", "count")
“is”,1
“sporm”,3
“the”,2
.....
例子3:
D:\.....\Workspaces\MyEclipse 8.5\bigData\examples-ch06-real-life-app-master\src\main\java\storm\analytics\....
第一步:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("read-feed", new UsersNavigationSpout(), 3);
builder.setBolt("get-categ", new GetCategoryBolt(), 3).shuffleGrouping("read-feed");
builder.setBolt("user-history", new UserHistoryBolt(), 5).fieldsGrouping("get-categ", new Fields("user"));
第二步:發送者輸出是三個結構體:Fields("user","product", "categ")
GetCategoryBolt.java
public void execute(Tuple input, BasicOutputCollector collector)
{
NavigationEntry entry = (NavigationEntry)input.getValue(1);
if("PRODUCT".equals(entry.getPageType())){
try {
String product = (String)entry.getOtherData().get("product");
// Call the items API to get item information
Product itm = reader.readItem(product);
if(itm ==null)
return ;
String categ = itm.getCategory();
collector.emit(new Values(entry.getUserId(), product, categ));
} catch (Exception ex) {
System.err.println("Error processing PRODUCT tuple"+ ex);
ex.printStackTrace();
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("user","product", "categ"));
}
第三步:new Fields("user"))只取Fields("user","product", "categ"))中的User
builder.setBolt("user-history", new UserHistoryBolt(), 5).fieldsGrouping("get-categ", new Fields("user"));
---------------------
作者:VessalasdXZ
來源:CSDN
原文:https://blog.csdn.net/vessalasd1/article/details/50472123
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!