storm消息容錯機制(ack-fail)
1、介紹
- 在storm中,可靠的信息處理機制是從spout開始的。
- 一個提供了可靠的處理機制的spout需要記錄他發射出去的tuple,當下游bolt處理tuple或者子tuple失敗時spout能夠重新發射。
- Storm通過調用Spout的nextTuple()發送一個tuple。為實現可靠的消息處理,首先要給每個發出的tuple帶上唯一的ID,並且將ID作為參數傳遞給SpoutOutputCollector的emit()方法:collector.emit(new Values("value1","value2"), msgId); messageid就是用來標示唯一的tuple的,而rootid是隨機生成的。
給每個tuple指定ID告訴Storm系統,無論處理成功還是失敗,spout都要接收tuple樹上所有節點返回的通知。如果處理成功,spout的ack()方法將會對編號是msgId的消息應答確認;如果處理失敗或者超時,會調用fail()方法。
2、基本實現
Storm 系統中有一組叫做"acker"的特殊的任務,它們負責跟蹤DAG(有向無環圖)中的每個消息。
acker任務保存了spout id到一對值的映射。第一個值就是spout的任務id,通過這個id,acker就知道消息處理完成時該通知哪個spout任務。第二個值是一個64bit的數字,我們稱之為"ack val", 它是樹中所有消息的隨機id的異或計算結果。
ack val表示了整棵樹(tuple樹)的的狀態,無論這棵樹多大,只需要這個固定大小的數字就可以跟蹤整棵樹。當消息被創建和被應答的時候都會有相同的消息id發送過來做異或。 每當acker發現一棵樹的ack val值為0的時候,它就知道這棵樹已經被完全處理了。
3、可靠性配置
有三種方法可以去掉消息的可靠性:
將參數Config.TOPOLOGY_ACKERS設置為0,通過此方法,當Spout發送一個消息的時候,它的ack方法將立刻被調用;
Spout發送一個消息時,不指定此消息的messageID。當需要關閉特定消息可靠性的時候,可以使用此方法;
最后,如果你不在意某個消息派生出來的子孫消息的可靠性,則此消息派生出來的子消息在發送時不要做錨定,即在emit方法中不指定輸入消息。因為這些子孫消息沒有被錨定在任何tuple tree中,因此他們的失敗不會引起任何spout重新發送消息。