Storm系列(十八)事務介紹


功能:將多個tuple組合成為一個批次,並保障每個批次的tuple被且僅被處理一次。

storm事務處理中,把一個批次的tuple的處理分為兩個階段processing和commit階段。

  • processing階段運行多個批次的tuple並行處理。
  • commit階段各批次之間需強制按照順序進行提交。

 

事務Topologies

在Transactional Topologies內部主要管理以下事情:

  1. 管理狀態: 把所有實現Transactional Topologies所必須的狀態保存在zookeeper里面,包括當前transaction id及定義每個batch的一些元數據。
  2. 協調事務: 決定在任何一個時間點是該proccessing還是該committing。
  3. 錯誤檢測: 利用acking框架來高效地檢測什么時候一個batch被成功處理了,被成功提交了,或者失敗了。Storm然后會相應地replay對應的batch。不需要手動做任何acking或者anchoring (emit時發生的動作)。
  4. 中間數據清理:決定什么時候一個bolt接收到一個特定transaction的所有tuple。Storm同時也會自動清理每個transaction所產生的中間數據。

 

事務Topologies的實現

Spout

事務性的spout需要實現ITransactionalSpout,這個接口包含兩個內部接口類Coordinator和Emitter。在topology運行的時候,事務性的spout內部包含一個子Topology.這里面有兩種類型的tuple,一種是事務性的tuple,一種是batch中的tuple.

coordinator用於開啟一個事務,並在准備進入一個事務的processing階段時,發射一個事務性 tuple(transactionAttempt & metadata)到”batch emit”流,coordinator只有一個,emitter根據並行度可以有多個實例.

Emitter以all grouping(廣播)的方式訂閱coordinator的”batch emit”流,負責為每個batch實際發射tuple。發送的tuple都必須以TransactionAttempt作為第一個field,storm根據這個field來判斷tuple屬於哪一個batch。

coordinator與Emitter關系結構圖

image

 

TransactionAttempt

TransactionAttempt中包含兩個值:一個transaction id,一個attempt id。transaction id的作用就是我們上面介紹的對於每個batch中的tuple是唯一的,而且不管這個batch replay多少次都是一樣的。

attempt id是對於每個batch唯一的一個id, 但是對於同一個batch,它replay之后的attempt id跟replay之前就不一樣了,storm利用這個id來區別一個batch發射的tuple的不同版本。

事務性Bolt

BaseTransactionalBolt

  • 處理batch在一起的tuples,對於每一個tuple調用execute方 法,而在整個batch處理(processing)完成的時候調用finishBatch方法。如果BatchBolt被標記成committer,則 只能在commit階段調用finishBatch方法。一個batch的commit階段由storm保證只在前一個batch成功提交之后才會執行。並且它會重試直到topology里面的所有bolt在commit完成提交。那么如何知道batch的processing完成了,也就是bolt是否接收處理了batch里面所有的tuple,在bolt內部有一個 CoordinatedBolt的模型。
  • 被標記成committer的BatchBolt需要實現ICommitter接口或者通過TransactionalTopologyBuilder的setCommitterBolt方法把BatchBolt添加到topology。

CoordinateBolt具體原理如下:

image

 

CoordinateBolt

  • 每個CoordinateBolt記錄兩個值:有哪些task給我發送了tuple(根據topology的grouping信息);我要給哪些task發送信息(同樣根據groping信息)。
  • 等所有的tuple都發送完了之后,CoordinateBolt通過另外一個特殊的stream以emitDirect的方式告訴所有它發送過 tuple的task,它發送了多少tuple給這個task。下游task會將這個數字和自己已經接收到的tuple數量做對比,如果相等,則說明處理 完了所有的tuple。
  • 下游CoordinateBolt會重復上面的步驟,通知其下游。

事務內部處理流程圖

image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM