在這里,將會提到storm的七種grouping策略,並且編碼逐一實現。
首先,需要一個集群(希望盡量模仿真實環境,故就不用本地模式了)。詳細的安裝方法大家可以查看本人的另外一篇博文:storm集群和zookeeper集群的部署過程。
OK。現在有三個節點。一個作為nimbus,兩個作為supervisor。到這里先介紹一下storm邏輯上有兩個component,一個是Spout,另一個是Bolt。stream由Spout發出,在不同的Bolt之間進行處理,在其中傳遞的是storm的基本處理單位:Tuple。由Spout發出一個一個Tuple,然后Bolt接收Tuple進行各種各樣的處理。這一整個過程構成一個DAG。在storm里面叫做Topology。當使用遠程模式向集群提交一個Topology之后,如果不kill掉的話,將會一直運行到。。。我也不知道盡頭。。貌似沒有盡頭。
好了,來看一個簡單的Topology。將使用這個Topology來實現那幾種Grouping策略。
上圖中spout的處理邏輯是將一句話發出給下一個Bolt,然后下一個Bolt做句子的單詞分割,下一個做計數,最后的Bolt做匯總顯示。這里可以有多個Bolt或者Spout進行並行處理。這是關於並行度的設置。
好了,所謂的grouping策略就是在Spout與Bolt、Bolt與Bolt之間傳遞Tuple的方式。總共有七種方式:
1)shuffleGrouping(隨機分組)
2)fieldsGrouping(按照字段分組,在這里即是同一個單詞只能發送給一個Bolt)
3)allGrouping(廣播發送,即每一個Tuple,每一個Bolt都會收到)
4)globalGrouping(全局分組,將Tuple分配到task id值最低的task里面)
5)noneGrouping(隨機分派)
6)directGrouping(直接分組,指定Tuple與Bolt的對應發送關系)
7)Local or shuffle Grouping
8)customGrouping (自定義的Grouping)
OK,下面逐個來試試!
首先是使用shuffleGrouping策略。
啟動后所得的結果。
可以看到如storm這個單詞被隨機分配到了兩個counter里面,分別是h2和h3兩個節點里面。可再做一次提交,又會看到不一樣的結果。將與下面的fieldGrouping形成對比。
然后換成fieldsGrouping。
啟動后的結果如圖。圖中wordcount2和wordcount3就是我兩次提交的topology的名字。
下面是兩次提交的結果。可以看出,使用fieldsGrouping策略,被分配到每個wordcounterbolt的單詞沒有變化。
下面是第二次提交的結果。
再換成noneGrouping策略。
提交集群運行。
運行結果如圖。noneGrouping和shuffleGrouping是基本一樣的。都是隨機的。
替換成allGrouping策略。
提交集群運行。
運行結果如圖。可以看到。兩個bolt所接收到的單詞是一樣的,都是全部的單詞。
最后,替換成globalGrouping策略。
提交集群運行。
從上圖可以看到,主要分配到了h3這個節點。從下面的結果得以驗證。
好了,就先做到這里吧!剩下的grouping策略需要在修改一個代碼,就下次再做了。本來不想上這么多圖的,不過,不是說有圖有真相嘛!哈哈~