Storm入門(六)深入理解可靠性機制


轉自http://blog.csdn.net/zhangzhebjut/article/details/38467145

一 可靠性簡介

       Storm的可靠性是指Storm會告知用戶每一個消息單元是否在一個指定的時間(timeout)內被完全處理。完全處理的意思是該MessageId綁定的源Tuple以及由該源Tuple衍生的所有Tuple都經過了Topology中每一個應該到達的Bolt的處理。

注: timetout 可以通過Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 來指定

       Storm中的每一個Topology中都包含有一個Acker組件。Acker組件的任務就是跟蹤從某個task中的Spout流出的每一個messageId所綁定的Tuple樹中的所有Tuple的處理情況。如果在用戶設置的最大超時時間內這些Tuple沒有被完全處理,那么Acker會告訴Spout該消息處理失敗,相反則會告知Spout該消息處理成功,它會分別調用Spout中的fail和ack方法。

Storm允許用戶在Spout中發射一個新的源Tuple時為其指定一個MessageId,這個MessageId可以是任意的Object對象。多個源Tuple可以共用同一個MessageId,表示這多個源Tuple對用戶來說是同一個消息單元,它們會被放到同一棵tuple樹中,如下圖所示:

           Tuple 樹

       在Spout中由message 1綁定的tuple1和tuple2分別經過bolt1和bolt2的處理,然后生成了兩個新的Tuple,並最終流向了bolt3。當bolt3處理完之后,稱message 1被完全處理了。

二 Acker 原理分析

       storm里面有一類特殊的task稱為acker(acker bolt), 負責跟蹤spout發出的每一個tuple的tuple樹。當acker發現一個tuple樹已經處理完成了。它會發送一個消息給產生這個tuple的那個task。你可以通過Config.TOPOLOGY_ACKERS來設置一個topology里面的acker的數量, 默認值是1。 如果你的topology里面的tuple比較多的話, 那么把acker的數量設置多一點,效率會高一點。

       理解storm的可靠性的最好的方法是來看看tuple和tuple樹的生命周期, 當一個tuple被創建, 不管是spout還是bolt創建的, 它會被賦予一個64位的id,而acker就是利用這個id去跟蹤所有的tuple的。每個tuple知道它的祖宗的id(從spout發出來的那個tuple的id), 每當你新發射一個tuple, 它的祖宗id都會傳給這個新的tuple。所以當一個tuple被ack的時候,它會發一個消息給acker,告訴它這個tuple樹發生了怎么樣的變化。具體來說就是它告訴acker:  我已經完成了, 我有這些兒子tuple, 你跟蹤一下他們吧。

                                  (spout-tuple-id, tmp-ack-val)
                 tmp-ark-val =  tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ... )

tmp-ack-val是要ack的tuple的id與由它新創建的所有的tuple的id異或的結果

當一個tuple需要ack的時候,它到底選擇哪個acker來發送這個信息呢?

  storm使用一致性哈希來把一個spout-tuple-id對應到acker, 因為每一個tuple知道它所有的祖宗的tuple-id, 所以它自然可以算出要通知哪個acker來ack。

注:一個tuple可能存在於多個tuple樹,所有可能存在多個祖宗的tuple-id

acker是怎么知道每一個spout tuple應該交給哪個task來處理?

       當一個spout發射一個新的tuple, 它會簡單的發一個消息給一個合適的acker,並且告訴acker它自己的id(taskid), 這樣storm就有了taskid-tupleid的對應關系。 當acker發現一個樹完成處理了, 它知道給哪個task發送成功的消息。

Acker的高效性

acker task並不顯式的跟蹤tuple樹。對於那些有成千上萬個節點的tuple樹,把這么多的tuple信息都跟蹤起來會耗費太多的內存。相反, acker用了一種不同的方式, 使得對於每個spout tuple所需要的內存量是恆定的(20 bytes) .  這個跟蹤算法是storm如何工作的關鍵,並且也是它的主要突破。

        一個acker task存儲了一個spout-tuple-id到一對值的一個mapping。這個對子的第一個值是創建這個tuple的taskid, 這個是用來在完成處理tuple的時候發送消息用的。 第二個值是一個64位的數字稱作:ack val, ack val是整個tuple樹的狀態的一個表示,不管這棵樹多大。它只是簡單地把這棵樹上的所有創建的tupleid/ack的tupleid一起異或(XOR)。

當一個acker task 發現一個 ack val變成0了, 它知道這棵樹已經處理完成了。

例如下圖是一個簡單的Topology。

個簡單的 Topology

        ack_val的初值為0,varl_x表示新產生的tuple id ,它們經過Spout,Bolt1,Bolt2,Bolt3 處理,並與arv_val異或,最終arv_val變為0,表示tuple1被成功處理。

   下面看一個稍微復雜一點的例子:

注:紅色虛線框表示的是Acker組件,ack_val表示acker value的值,它的初值為0

        msg1綁定了兩個源tuple,它們的id分別為1001和1010.在經過Bolt1處理后新生成了tuple id為1110,新生成的tuple與傳入的tuple 1001進行異或得到的值為0111,然后Bolt1通過spout-tuple-id映射到指定的Acker組件,向它發送消息,Acker組件將Bolt1傳過來的值與ack_val異或,更新ack_val的值變為了0100。與此相同經過Bolt2處理后,ack_val的值變為0001。最后經Bolt3處理后ack_val的值變為了0,說明此時由msg1標識的Tuple處理成功,此時Acker組件會通過事先綁定的task id映射找到對應的Spout,然后調用該Spout的ack方法。

            其流程如下圖所示:


  注:1. Acker (ack bolt)組件由系統自動產生,一般來說一個topology只有一個ack bolt(當然可以通過配置參數指定多個),當bolt處理並下發完tuple給下一跳的bolt時,會發送一個ack給ack bolt。ack bolt通過簡單的異或原理(即同一個數與自己異或結果為零)來判定從spout發出的某一個Tuple是否已經被完全處理完畢。如果結果為真,ack bolt發送消息給spout,spout中的ack函數被調用並執行。如果超時,則發送fail消息給spout,spout中的fail函數被調用並執行,spout中的ack和fail的處理邏輯由用戶自行填寫。

2. Acker對於每個Spout-tuple保存一個ack-val的校驗值,它的初始值是0, 然后每發射一個tuple 就ack一個tuple,那么tuple的id都要跟這個校驗值異或一下,並且把得到的值更新為ack-val的新值。那么假設每個發射出去的tuple都被ack了, 那么最后ack-val一定是0(因為一個數字跟自己異或得到的值是0)。

A xor A = 0.

          A xor B…xor B xor A = 0,其中每一個操作數出現且僅出現兩次。

        3. tupleid是隨機的64位數字, ack val碰巧變成0(例如:ark_val = 1 ^ 2  ^ 3 = 0)而不是因為所有創建的tuple都完成了,這樣的概率極小。算一下就知道了, 就算每秒發生10000個ack, 那么需要50000000萬年才可能碰到一個錯誤。而且就算碰到了一個錯誤, 也只有在這個tuple失敗的時候才會造成數據丟失。 

      看看storm在每種異常情況下是怎么避免數據丟失的:

         1. 由於對應的task掛掉了,一個tuple沒有被ack: storm的超時機制在超時之后會把這個tuple標記為失敗,從而可以重新處理。

         2. Acker掛掉了: 這種情況下由這個acker所跟蹤的所有spout tuple都會超時,也就會被重新處理。

3. Spout掛掉了: 在這種情況下給spout發送消息的消息源負責重新發送這些消息。比如Kestrel和RabbitMQ在一個客戶端斷開之后會把所有”處理中“的消息放回隊列。

就像你看到的那樣, storm的可靠性機制是完全分布式的, 可伸縮的並且是高度容錯的。

三 Acker 編程接口

 

      在Spout中,Storm系統會為用戶指定的MessageId生成一個對應的64位的整數,作為整個Tuple Tree的RootId。RootId會被傳遞給Acker以及后續的Bolt來作為該消息單元的唯一標識。同時,無論Spout還是Bolt每次新生成一個Tuple時,都會賦予該Tuple一個唯一的64位整數的Id。

      當Spout發射完某個MessageId對應的源Tuple之后,它會告訴Acker自己發射的RootId以及生成的那些源Tuple的Id。而當Bolt處理完一個輸入Tuple並產生出新的Tuple時,也會告知Acker自己處理的輸入Tuple的Id以及新生成的那些Tuple的Id。Acker只需要對這些Id進行異或運算,就能判斷出該RootId對應的消息單元是否成功處理完成了。

   下面這個是spout要實現的接口:

public interface ISpout extends Serializable {  
void open(Map conf, TopologyContext context,  
         SpoutOutputCollector collector);  
 void close();  
 void nextTuple();  
 void ack(Object msgId);  
 void fail(Object msgId);

首先storm通過調用spout的nextTuple方法來獲取下一個tuple, Spout通過open方法參數里面提供的SpoutOutputCollector來發射新tuple到它的其中一個輸出消息流, 發射tuple的時候spout會提供一個message-id, 后面通過這個message-id來追蹤這個tuple。

this.collector.emit(new Values("hello world"),msgId);
注:msgId是提供給Acker組件使用的,Acker組件使用msgId來跟蹤Tuple樹

       接下來, 這個發射的tuple被傳送到消息處理者bolt那里, storm會跟蹤由此所產生的這課tuple樹。如果storm檢測到一個tuple被完全處理了, 那么storm會以最開始的那個message-id作為參數去調用消息源的ack方法;反之storm會調用spout的fail方法。值得注意的是, storm調用ack或者fail的task始終是產生這個tuple的那個task。所以如果一個spout被分成很多個task來執行, 消息執行的成功失敗與否始終會通知最開始發出tuple的那個task。

作為storm的使用者,有兩件事情要做以更好的利用storm的可靠性特征。 首先,在你生成一個新的tuple的時候要通知storm; 其次,完成處理一個tuple之后要通知storm。 這樣storm就可以檢測整個tuple樹有沒有完成處理,並且通知源spout處理結果。storm提供了一些簡潔的api來做這些事情。

    由一個tuple產生一個新的tuple稱為:anchoring。你發射一個新tuple的同時也就完成了一次anchoring。看下面這個例子: 這個bolt把一個包含一個句子的tuple分割成每個單詞一個tuple。

public class SplitSentence implements IRichBolt {  
      OutputCollector _collector;  
  
      public void prepare(Map conf,  
                       TopologyContext context,  
                       OutputCollector collector) {  
            _collector = collector;  
      }  
  
      public void execute(Tuple tuple) {  
              String sentence = tuple.getString(0);  
              for(String word: sentence.split(" ")) {  
                    _collector.emit(tuple,new Values(word));  
              }  
           _collector.ack(tuple);  
      }  
  
     publicvoid cleanup() {}  
     publicvoid declareOutputFields(OutputFieldsDeclarer declarer) {  
                declarer.declare(newFields("word"));  
     }  
 }

        看一下這個execute方法, emit的第一個參數是輸入tuple, 第二個參數則是輸出tuple, 這其實就是通過輸入tuple anchoring了一個新的輸出tuple。因為這個“單詞tuple”被anchoring在“句子tuple”一起, 如果其中一個單詞處理出錯,那么這整個句子會被重新處理。作為對比, 我們看看如果通過下面這行代碼來發射一個新的tuple的話會有什么結果。

_collector.emit(new Values(word));

        用這種方法發射會導致新發射的這個tuple脫離原來的tuple樹(unanchoring), 如果這個tuple處理失敗了, 整個句子不會被重新處理。一個輸出tuple可以被anchoring到多個輸入tuple。這種方式在stream合並或者stream聚合的時候很有用。一個多入口tuple處理失敗的話,那么它對應的所有輸入tuple都要重新執行。看看下面演示怎么指定多個輸入tuple:

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors,new Values(1,2,3));

         我們通過anchoring來構造這個tuple樹,最后一件要做的事情是在你處理完這個tuple的時候告訴storm,  通過OutputCollector類的ack和fail方法來做,如果你回過頭來看看SplitSentence的例子, 你可以看到“句子tuple”在所有“單詞tuple”被發出之后調用了ack。

       你可以調用OutputCollector 的fail方法去立即將從消息源頭發出的那個tuple標記為fail, 比如你查詢了數據庫,發現一個錯誤,你可以馬上fail那個輸入tuple, 這樣可以讓這個tuple被快速的重新處理, 因為你不需要等那個timeout時間來讓它自動fail。

每個你處理的tuple, 必須被ack或者fail。因為storm追蹤每個tuple要占用內存。所以如果你不ack/fail每一個tuple, 那么最終你會看到OutOfMemory錯誤。

       大多數Bolt遵循這樣的規律:讀取一個tuple;發射一些新的tuple;在execute的結束的時候ack這個tuple。這些Bolt往往是一些過濾器或者簡單函數。Storm為這類規律封裝了一個BasicBolt類。如果用BasicBolt來做, 上面那個SplitSentence可以改寫成這樣:

publicclass SplitSentence implements IBasicBolt {  
    public void prepare(Map conf,  
                        TopologyContext context) {  
       }  
  
    public void execute(Tuple tuple,  
                        BasicOutputCollector collector) {  
              String sentence = tuple.getString(0);  
              for(String word: sentence.split(" ")) {  
                collector.emit(newValues(word));  
              }  
   }  
  
    publicvoid cleanup() {}  
  
    publicvoid declareOutputFields(  
                    OutputFieldsDeclarer declarer) {  
        declarer.declare(newFields("word"));  
    }  
}
 
        

   這個實現比之前的實現簡單多了, 但是功能上是一樣的,發送到BasicOutputCollector的tuple會自動和輸入tuple相關聯,而在execute方法結束的時候那個輸入tuple會被自動ack的。

    作為對比,處理聚合和合並的bolt往往要處理一大堆的tuple之后才能被ack, 而這類tuple通常都是多輸入的tuple, 所以這個已經不是IBasicBolt可以罩得住的了。

注:當一個Tuple處理失敗的時候,storm不會自動的重發該tuple,需要用戶自己來編寫邏輯重新處理fail掉的Tuple,可以將其放入一個列表中,在nextTuple()中獲取這些失敗的tuple,重新發射。

四 調整可靠性 

       acker task是非常輕量級的, 所以一個topology里面不需要很多acker。你可以通過Strom UI(id: -1)來跟蹤它的性能。 如果它的吞吐量看起來不正常,那么你就需要多加點acker了。

如果可靠性對你來說不是那么重要 — 你不太在意在一些失敗的情況下損失一些數據, 那么你可以通過不跟蹤這些tuple樹來獲取更好的性能。不去跟蹤消息的話會使得系統里面的消息數量減少一半, 因為對於每一個tuple都要發送一個ack消息。並且它需要更少的id來保存下游的tuple, 減少帶寬占用。

有三種方法可以去掉可靠性:

       第一是把Config.TOPOLOGY_ACKERS 設置成 0. 在這種情況下, storm會在spout發射一個tuple之后馬上調用spout的ack方法。也就是說這個tuple樹不會被跟蹤。

第二個方法是在tuple層面去掉可靠性。 你可以在發射tuple的時候不指定messageid來達到不跟蹤某個特定的spout tuple的目的。

最后一個方法是如果你對於一個tuple樹里面的某一部分到底成不成功不是很關心,那么可以在發射這些tuple的時候unanchor它們。 這樣這些tuple就不在tuple樹里面, 也就不會被跟蹤了。

五 小結

      在分布式系統中實現對數據的可靠處理是一件繁瑣的事情,storm將其實現的非常優雅,其Acker不僅使得對數據的可靠處理變得簡單而且還很高效,這個很值得學習和借鑒。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM