關於storm的Spout、Bolt、及其可靠性


本文導讀:

Component全家譜結構圖
Spout分析
  ——類圖
  ——分析(接口實現)、結論
  ——可靠的與不可靠的消息(推薦
Bolt分析 
  ——類圖
  ——分析(接口實現)、結論
  ——可靠的與不可靠的Bolt(推薦

 
 

Component:

  Storm中,Spout和Bolt都是其Component,所以,Storm定義了一個名叫IComponent的總接口。

其全家譜結構圖如下: 


  綠色部分是我們最常用、比較簡單的部分。紅色部分是與事務相關的,在以后的文章會具體講解。
 
  BaseComponent 是Storm提供的“偷懶”的類。為什么這么說呢,它及其子類,都或多或少實現了其接口定義的部分方法。這樣我們在用的時候,可以直接繼承該類,而不是自己每次都寫所有的方法。但值得一提的是,BaseXXX這種定義的類,它所實現的方法,都是空的,直接返回null
 

Spout:

   假設我們實現一個extendsBaseRichSpout的RandomSpout類,隨機發射數據,那么它的類圖如下所示:
分析:
   Spout的最頂層抽象是ISpout接口。
  • open方法是初始化動作。允許你在該spout初始化時做一些動作,傳入了上下文,方便取上下文的一些數據。
  • close方法在該spout關閉前執行,但是並不能得到保證其一定被執行。spout是作為task運行在worker內,在cluster模式下,supervisor會直接kill -9 woker的進程,這樣它就無法執行了。而在本地模式下,只要不是kill -9, 如果是發送停止命令,是可以保證close的執行的。
  • activatedeactivate :一個spout可以被暫時激活和關閉,這兩個方法分別在對應的時刻被調用。
  • nextTuple 用來發射數據。
  • ack(Object)傳入的Object其實是一個id,唯一表示一個tuple。該方法是這個id所對應的tuple被成功處理后執行。
  • fail(Object)同ack,只不過是tuple處理失敗時執行。
  我們的RandomSpout 由於繼承了BaseRichSpout,所以不用實現close、activate、deactivate、ack、fail和getComponentConfiguration方法,只關心最基本核心的部分。
結論:
   通常情況下(Shell和事務型的除外),實現一個Spout,可以直接實現接口IRichSpout,如果不想寫多余的代碼,可以直接繼承BaseRichSpout
 
附注Storm可靠的與不可靠的消息 (分析+實例

 

Bolt:

   假設我們實現一個extendsBaseBasicBolt的ExclaimBasicSpout類,處理數據,那么它的類圖如下所示:
疑問:為什么IBasicBolt沒有繼承IBolt呢?我們先往下看......
分析:
  IBolt定義了三個方法:
  • IBolt繼承了java.io.Serializable,我們在nimbus上提交了topology以后,創建出來的bolt會序列化后發送到具體執行的worker上去。worker在執行該Bolt時,會先調用prepare方法傳入當前執行的上下文
  • execute接受一個tuple進行處理,並用prepare方法傳入的OutputCollector的ack方法(表示成功)或fail(表示失敗)來反饋處理結果。
  • cleanup 同ISpout的close方法,在關閉前調用。同樣不保證其一定執行。
備注:
   紅色部分是Bolt實現需要注意的地方。Storm提供了IBasicBolt接口,其目的就是實現該接口的Bolt不用在代碼中提供反饋結果了,Storm內部會自動反饋成功。如果確實需要反饋失敗,可以拋出FailedException
結論:
  通常情況下,實現一個Bolt,可以實現IRichBolt接口或繼承BaseRichBolt;如果不想自己處理結果反饋,可以實現IBasicBolt接口或繼承BaseBasicBolt,它實際上相當於自動做掉了prepare方法和collector.emit.ack(inputTuple)。
 
附注:Storm可靠的與不可靠的Bolt (分析+實例

 
 

補充——RichBolt vs BasicBolt

直接用BasicBolt,會在execute()后自動ack/fail Tuple,而RichBolt則需要自行調用ack/fail。

那什么時候使用RichBolt? Bolt不是在每次execute()時立刻產生新消息,需要異步的發送新消息(比如聚合一段時間的數據再發送)時,又或者想異步的ack/fail原消息時就需要。

BasicBolt的prepare()里並沒有collector參數,只在每次execute()時傳入collector。而RichBolt剛好相反,你可以在初始化時就把collector保存起來,用它在任意時候發送消息。

另外,如果用RichBolt的collector,還要考慮在發送消息時是否帶上傳入的Tuple,如果不帶,則下游的處理節點出錯也不會回溯到Spout重發。用BasicBolt則已默認帶上。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM