理解Storm可靠性消息


看過一些別人寫的, 感覺有些東西沒太說清楚,個人主要以源代碼跟蹤,參考個人理解講述,有錯誤請指正。

1基本名詞

1.1 Tuple: 消息傳遞的基本單位。很多文章中介紹都是這么說的, 個人覺得應該更詳細一點。

 在spout發送的時候,函數原型

 public List<Integer> emit(List<Object> tuple, Object messageId) {
        return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
    }

這里的tuple, 實際上是List<Object> 對象,返回的是 List<Integer> 是要發送的tast的IdsList

在bolt接收的時候, 函數原型

public void execute(Tuple tuple)

變成了一個Tuple對象,  結構應該也是一個list, List<Field1, value1, Field2, value2..>這樣的一個結構, FieldList ValueList, 我們根據對應的fieldname就可以取出對應的getIntegerByField方法

回到spout對象中來, 在spout有一個定義的輸出字段

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

這里定義的一個字段,所以我們在emit的時候就只能發送一個包含一個value的tuple(spout部分), storm會將field, 和 發送的value下標對應, 變成一個Tuple對象,  也就是上面說的

List<Field1, value1, Field2, value2..>這樣的一個結構,  在bolt 之間傳遞tuple, 發送又是List<Object> tuple, 根據組裝bolt定義的fiels, 再組合成Tuple對象給下一個Bolt處理

在發射的最后 還有一個 void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);  因為上面emit的時候已經返回List<taskid>, 所以它就知道要發送給哪些taskid處理,然后將taskid 和 tuple放入隊列LinkedBlockingQueue, 代碼如下

; worker.clj

( defn mk-transfer- fn [ transfer-queue ]

( fn [ task ^Tuple tuple ]

(.put ^LinkedBlockingQueue

transfer-queue [ task tuple ] )

))
然后單獨會開啟一個叫async-loop的線程,取出每條記錄(taskid, tuple), 然后worker會從當前task建立一個到目標task的zeromq連接, 通過zeromq將tuple發送給目標task
 

總結: 每次emit都是根據List<Object>和定義的輸出Fields組合成一個Tuple對象,,每個接受對象接收的是Tuple對象,如果處理完再發送又再組合字段, 在emit的時候返回LIst<taskids>,所以就知道發送給哪些Task, 然后拿這些taskid和tuple再組合成一個任務隊列,通過zeromq發送到目標task,目標task接收到tuple進程處理至於並發度控制, 參考

http://www.cnblogs.com/chengxin1982/p/4001275.html

 

TupleID Tuple對應的ID,  在創建的時候賦予一個64位的id,主要用來跟蹤消息

MsgID  官方解釋 Emits a new tuple to the default output stream with the given message ID. 如果不指定,acker不會跟蹤。主要作用 , 在spout收到fail時候, 能夠定位到是哪條消息出錯,能夠決定重發. 使用實例  _collector.emit(new Values(sentence),  new Integer(num));

acker 消息跟蹤者. acker 存儲一個Map<taskid, ack val> ,  taskid為祖宗tuple創建者的taskid ack_val 為消息傳遞過程中的 tupleid的xor值,如果為0則知道是哪個spout或者bolt已經處理完了, 為什么會有bolt, 因為bolt在發射的時候,如果非錨定,就是不帶tuple發射,它會被認為是祖宗tuple, 上一個tuple會認為已經結束.
至於分配發射源分配到acker, storm采用一致性hash 祖宗tupleid來分配,因為在所有的tuple中都能知道祖宗tupleid,所以在子孫tuple處理時, 知道該發送給哪個acker跟蹤

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM