正在學習storm的大兄弟們,我又來傳道授業解惑了,是不是覺得自己會用ack了。好吧,那就讓我開始啪啪打你們臉吧。
先說一下ACK機制:
為了保證數據能正確的被處理, 對於spout產生的每一個tuple, storm都會進行跟蹤。
這里面涉及到ack/fail的處理,如果一個tuple處理成功是指這個Tuple以及這個Tuple產生的所有Tuple都被成功處理, 會調用spout的ack方法;
如果失敗是指這個Tuple或這個Tuple產生的所有Tuple中的某一個tuple處理失敗, 則會調用spout的fail方法;
在處理tuple的每一個bolt都會通過OutputCollector來告知storm, 當前bolt處理是否成功。
另外需要注意的,當spout觸發fail動作時,不會自動重發失敗的tuple,需要我們在spout中重新獲取發送失敗數據,手動重新再發送一次。
Ack原理
Storm中有個特殊的task名叫acker,他們負責跟蹤spout發出的每一個Tuple的Tuple樹(因為一個tuple通過spout發出了,經過每一個bolt處理后,會生成一個新的tuple發送出去)。當acker(框架自啟動的task)發現一個Tuple樹已經處理完成了,它會發送一個消息給產生這個Tuple的那個task。
Acker的跟蹤算法是Storm的主要突破之一,對任意大的一個Tuple樹,它只需要恆定的20字節就可以進行跟蹤。
Acker跟蹤算法的原理:acker對於每個spout-tuple保存一個ack-val的校驗值,它的初始值是0,然后每發射一個Tuple或Ack一個Tuple時,這個Tuple的id就要跟這個校驗值異或一下,並且把得到的值更新為ack-val的新值。那么假設每個發射出去的Tuple都被ack了,那么最后ack-val的值就一定是0。Acker就根據ack-val是否為0來判斷是否完全處理,如果為0則認為已完全處理。
要實現ack機制:
1,spout發射tuple的時候指定messageId
2,spout要重寫BaseRichSpout的fail和ack方法
3,spout對發射的tuple進行緩存(否則spout的fail方法收到acker發來的messsageId,spout也無法獲取到發送失敗的數據進行重發),看看系統提供的接口,只有msgId這個參數,這里的設計不合理,其實在系統里是有cache整個msg的,只給用戶一個messageid,用戶如何取得原來的msg貌似需要自己cache,然后用這個msgId去查詢,太坑爹了
3,spout根據messageId對於ack的tuple則從緩存隊列中刪除,對於fail的tuple可以選擇重發。
4,設置acker數至少大於0;Config.setNumAckers(conf, ackerParal);
Storm的Bolt有BsicBolt和RichBolt:
在BasicBolt中,BasicOutputCollector在emit數據的時候,會自動和輸入的tuple相關聯,而在execute方法結束的時候那個輸入tuple會被自動ack。
使用RichBolt需要在emit數據的時候,顯示指定該數據的源tuple要加上第二個參數anchor tuple,以保持tracker鏈路,即collector.emit(oldTuple, newTuple);並且需要在execute執行成功后調用OutputCollector.ack(tuple), 當失敗處理時,執行OutputCollector.fail(tuple);
由一個tuple產生一個新的tuple稱為:anchoring,你發射一個tuple的同時也就完成了一次anchoring。
ack機制即,spout發送的每一條消息,在規定的時間內,spout收到Acker的ack響應,即認為該tuple 被后續bolt成功處理;在規定的時間內(默認是30秒),沒有收到Acker的ack響應tuple,就觸發fail動作,即認為該tuple處理失敗,timeout時間可以通過Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS來設定。或者收到Acker發送的fail響應tuple,也認為失敗,觸發fail動作
注意,我開始以為如果繼承BaseBasicBolt那么程序拋出異常,也會讓spout進行重發,但是我錯了,程序直接異常停止了
這里我以分布式程序入門案例worldcount為例子吧。請看下面大屏幕:沒有錯我就是那個你們走在路上經常聽見的名字劉洋。
這里spout1-1task發送句子"i am liu yang"給bolt2-2task進行處理,該task把句子切分為單詞,根據字段分發到下一個bolt中,bolt2-2,bolt4-4,bolt5-5對每一個單詞添加一個后綴1后再發送給下一個bolt進行存儲到數據庫的操作,這個時候bolt7-7task在存儲數據到數據庫時失敗,向spout發送fail響應,這個時候spout收到消息就會再次發送的該數據。
好,那么我思考一個問題:spout如何保證再次發送的數據就是之前失敗的數據,所以在spout實例中,絕對要定義一個map緩存,緩存發出去的每一條數據,key當然就是messageId,當spout實例收到所有bolt的響應后如果是ack,就會調用我們重寫的ack方法,在這個方法里面我們就要根據messageId刪除這個key-value,如果spout實例收到所有bolt響應后,發現是faile,則會調用我們重寫的fail方法,根據messageId查詢到對應的數據再次發送該數據出去。
spout代碼如下
public class MySpout extends BaseRichSpout {
private static final long serialVersionUID = 5028304756439810609L; // key:messageId,Data private HashMap<String, String> waitAck = new HashMap<String, String>(); private SpoutOutputCollector collector; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void nextTuple() { String sentence = "i am liu yang"; String messageId = UUID.randomUUID().toString().replaceAll("-", ""); waitAck.put(messageId, sentence); //指定messageId,開啟ackfail機制 collector.emit(new Values(sentence), messageId); } @Override public void ack(Object msgId) { System.out.println("消息處理成功:" + msgId); System.out.println("刪除緩存中的數據..."); waitAck.remove(msgId); } @Override public void fail(Object msgId) { System.out.println("消息處理失敗:" + msgId); System.out.println("重新發送失敗的信息..."); //重發如果不開啟ackfail機制,那么spout的map對象中的該數據不會被刪除的。 collector.emit(new Values(waitAck.get(msgId)),msgId); } }
雖然在storm項目中我們的spout源通常來源kafka,而且我們使用storm提供的工具類KafkaSpout類,其實這個類里面就維護者<messageId,Tuple>對的集合。
問題一:你們有沒有想過如果某一個task節點處理的tuple一直失敗,消息一直重發會怎么樣?
我們都知道,spout作為消息的發送源,在沒有收到該tuple來至左右bolt的返回信息前,是不會刪除的,那么如果消息一直失敗,就會導致spout節點存儲的tuple數據越來越多,導致內存溢出。
問題二:有沒有想過,如果該tuple的眾多子tuple中,某一個子tuple處理failed了,但是另外的子tuple仍然會繼續執行,如果子tuple都是執行數據存儲操作,那么就算整個消息失敗,那些生成的子tuple還是會成功執行而不會回滾的。
這個時候storm的原生api是無法支持這種事務性操作,我們可以使用storm提供的高級api-trident來做到(具體如何我不清楚,目前沒有研究它,但是我可以它內部一定是根據分布式協議比如兩階段提交協議等)。向這種業務中要保證事務性功能,我們完全可以根據我們自身的業務來做到,比如這里的入庫操作,我們先記錄該消息是否已經入庫的狀態,再入庫時查詢狀態來決定是否給予執行。
問題三:tuple的追蹤並不一定要是從spout結點到最后一個bolt,只要是spout開始,可以在任意層次bolt停止追蹤做出應答。
Acker task 組件來設置一個topology里面的acker的數量,默認值是一,如果你的topoogy里面的tuple比較多的話,那么請把acker的數量設置多一點,效率會更高一點。
acker task是非常輕量級的, 所以一個topology里面不需要很多acker。你可以通過Strom UI(id: -1)來跟蹤它的性能。 如果它的吞吐量看起來不正常,那么你就需要多加點acker了。
如果可靠性對你來說不是那么重要 — 你不太在意在一些失敗的情況下損失一些數據, 那么你可以通過不跟蹤這些tuple樹來獲取更好的性能。不去跟蹤消息的話會使得系統里面的消息數量減少一半, 因為對於每一個tuple都要發送一個ack消息。並且它需要更少的id來保存下游的tuple, 減少帶寬占用。
有三種方法可以去掉可靠性。
第二個方法是在tuple層面去掉可靠性。 你可以在發射tuple的時候不指定messageid來達到不跟粽某個特定的spout tuple的目的。
最后一個方法是如果你對於一個tuple樹里面的某一部分到底成不成功不是很關心,那么可以在發射這些tuple的時候unanchor它們。 這樣這些tuple就不在tuple樹里面, 也就不會被跟蹤了。
可靠性配置
有三種方法可以去掉消息的可靠性:
將參數Config.TOPOLOGY_ACKERS設置為0,通過此方法,當Spout發送一個消息的時候,它的ack方法將立刻被調用;
Spout發送一個消息時,不指定此消息的messageID。當需要關閉特定消息可靠性的時候,可以使用此方法;
最后,如果你不在意某個消息派生出來的子孫消息的可靠性,則此消息派生出來的子消息在發送時不要做錨定,即在emit方法中不指定輸入消息。因為這些子孫消息沒有被錨定在任何tuple tree中,因此他們的失敗不會引起任何spout重新發送消息。
如何關閉Ack機制
有2種途徑
spout發送數據是不帶上msgid
設置acker數等於0
值得注意的一點是Storm調用Ack或者fail的task始終是產生這個tuple的那個task,所以如果一個Spout,被分為很多個task來執行,消息執行的成功失敗與否始終會通知最開始發出tuple的那個task。
作為Storm的使用者,有兩件事情要做以更好的利用Storm的可靠性特征,首先你在生成一個tuple的時候要通知Storm,其次,完全處理一個tuple之后要通知Storm,這樣Storm就可以檢測到整個tuple樹有沒有完成處理,並且通知源Spout處理結果。
1 由於對應的task掛掉了,一個tuple沒有被Ack:
Storm的超時機制在超時之后會把這個tuple標記為失敗,從而可以重新處理。
2 Acker掛掉了: 在這種情況下,由這個Acker所跟蹤的所有spout tuple都會出現超時,也會被重新的處理。
3 Spout 掛掉了:在這種情況下給Spout發送消息的消息源負責重新發送這些消息。
三個基本的機制,保證了Storm的完全分布式,可伸縮的並且高度容錯的。
另外Ack機制還常用於限流作用: 為了避免spout發送數據太快,而bolt處理太慢,常常設置pending數,當spout有等於或超過pending數的tuple沒有收到ack或fail響應時,跳過執行nextTuple, 從而限制spout發送數據。
通過conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);設置spout pend數。