storm 介紹+八種grouping方法


 

Storm主要的應用場景就是流式數據處理,例如實時推薦系統,實時監控系統等。

storm中的相關概念

在storm中,分布式的計算結構指的是一個topology(拓撲),一個topology由流式數據,spouts(流生產者),以及bolts(具體操作者)組成。Storm的topologies和其他的批處理任務系統很類似,例如Hadoop,這類批處理任務都定義了清晰的開始和結束點,然而storm的topologies是永不停息的在運行的,除非殺死或者反部署這個topologies。

Topology:storm都是以topology為單位運行的,topology就相當於網絡中的拓撲圖一樣。

Tuple:tuple是storm結構中的核心數據,一個tuple可以簡單的理解為一系列的的鍵值對(key-value pairs),是storm結構中最小的數據單元。如果你對CEP(complex event processing)熟悉的話,你可以認為tuples就是事件集。

Streams:streams是由無限的tuples組成。

Spouts:spouts代表一個storm topology的數據入口,spouts扮演者適配器的作用,連接着一個個的數據源,並將數據轉換成tuples,同時以數據流的方式發送tuples。數據源的來源有如下幾種:1、網絡或者是移動應用;2、推特或者是微博等社交網絡;3、傳感器輸出;4、應用日志事件。典型的spouts不會實現任何的特定業務邏輯,所以spouts可以經常被重復交叉的被多個topologies使用

Bolts:bolts可以想象成計算的操作者或者是一個函數,他們可以接收任意的數據流或者被處理過的數據,而且還可以隨意的發送一個或多個tuples,bolts可以訂閱spouts或者是其他bolts發送過來的數據流,bolts可以創造一個復雜的數據傳輸網絡。bolts的典型作用如下:1、過濾tuples;2、連接或者是聚合;3、計算

一個簡單的topology如下圖所示:

 

cleanup()方法,該方法只有在本地模式下才起作用,在集群模式下,是不起作用的,由於我們是在本地測試,所以我們使用的是storm的本地模式,storm的本地模式對我們的開發,測試,調試有很大的幫助作用,在我們部署成集群模式之前,我們可以充分的發揮本地模式的功能,在本地模式下,kill和關閉topology的時候,會調用這個cleanup()方法,從而實現我們打印統計結果的需求。

 

 

1. Shuffle Grouping 
隨機分組,隨機派發stream里面的tuple,保證每個bolt task接收到的tuple數目大致相同。
輪詢,平均分配 

2. Fields Grouping(相同fields去分發到同一個Bolt)
按字段分組,比如,按"user-id"這個字段來分組,那么具有同樣"user-id"的 tuple 會被分到相同的Bolt里的一個task, 而不同的"user-id"則可能會被分配到不同的task。 

3. All Grouping
廣播發送,對於每一個tuple,所有的bolts都會收到 

4. Global Grouping
全局分組,把tuple分配給task id最低的task 。

5. None Grouping
不分組,這個分組的意思是說stream不關心到底怎樣分組。目前這種分組和Shuffle grouping是一樣的效果。 有一點不同的是storm會把使用none grouping的這個bolt放到這個bolt的訂閱者同一個線程里面去執行(未來Storm如果可能的話會這樣設計)。 

6. Direct Grouping
指向型分組, 這是一種比較特別的分組方法,用這種分組意味着消息(tuple)的發送者指定由消息接收者的哪個task處理這個消息。只有被聲明為 Direct Stream 的消息流可以聲明這種分組方法。而且這種消息tuple必須使用 emitDirect 方法來發射。消息處理者可以通過 TopologyContext 來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id)  

7. Local or shuffle grouping
本地或隨機分組。如果目標bolt有一個或者多個task與源bolt的task在同一個工作進程中,tuple將會被隨機發送給這些同進程中的tasks。否則,和普通的Shuffle Grouping行為一致

8.customGrouping
自定義,相當於mapreduce那里自己去實現一個partition一樣。

總結:前4種用的多些,后面4種用的少些。

 

1. Shuffle Grouping 
隨機分組,隨機派發stream里面的tuple,保證每個bolt task接收到的tuple數目大致相同。
輪詢,平均分配 

2. Fields Grouping(相同fields去分發到同一個Bolt)
按字段分組,比如,按"user-id"這個字段來分組,那么具有同樣"user-id"的 tuple 會被分到相同的Bolt里的一個task, 而不同的"user-id"則可能會被分配到不同的task。 

3. All Grouping
廣播發送,對於每一個tuple,所有的bolts都會收到 

4. Global Grouping
全局分組,把tuple分配給task id最低的task 。

5. None Grouping
不分組,這個分組的意思是說stream不關心到底怎樣分組。目前這種分組和Shuffle grouping是一樣的效果。 有一點不同的是storm會把使用none grouping的這個bolt放到這個bolt的訂閱者同一個線程里面去執行(未來Storm如果可能的話會這樣設計)。 

6. Direct Grouping
指向型分組, 這是一種比較特別的分組方法,用這種分組意味着消息(tuple)的發送者指定由消息接收者的哪個task處理這個消息。只有被聲明為 Direct Stream 的消息流可以聲明這種分組方法。而且這種消息tuple必須使用 emitDirect 方法來發射。消息處理者可以通過 TopologyContext 來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id)  

7. Local or shuffle grouping
本地或隨機分組。如果目標bolt有一個或者多個task與源bolt的task在同一個工作進程中,tuple將會被隨機發送給這些同進程中的tasks。否則,和普通的Shuffle Grouping行為一致

8.customGrouping
自定義,相當於mapreduce那里自己去實現一個partition一樣。

總結:前4種用的多些,后面4種用的少些。

 

 

1. builder.setBolt("bolt", new MyBolt(), 2).allGrouping("spout");//兩個spot並行 所有都分發

 

 

 2. builder.setBolt("bolt", new MyBolt(), 2).shuffleGrouping("spout")其實就是隨機往下游去發,不自覺的做到了負載均衡

3.builder.setBolt("bolt", new MyBolt(), 2).fieldsGrouping("spout", new Fields("session_id")); // fieldsGrouping其實就是MapReduce里面理解的Shuffle,根據fields求hash來取模,相同的名稱的fields分發到一個bolt里面。

4.builder.setBolt("bolt", new MyBolt(), 2).globalGrouping("spout"); // 只往一個里面發,往taskId小的那個里面去發送

 

為什么要用group?

栗子:


builder.setBolt(SPLIT_BOLT_ID, splitBolt).fieldsGrouping(SENTENCE_SPOUT_ID, new Fields("sentence"))
/*
* SplitSentenceBolt --> WordCountBolt
* 注意,此處需要使用fieldsGrouping來分組,要不然統計的數據會不准,例如一個Bolt中接收到{"word":"dog","count":"1"}
* 然后又來了一個{"word":"dog","count":"1"},但是又沒有發送到同一個Bolt中,那么就會重新統計
*/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM