看過一些別人寫的, 感覺有些東西沒太說清楚,個人主要以源代碼跟蹤,參考個人理解講述,有錯誤請指正。
1基本名詞
1.1 Tuple: 消息傳遞的基本單位。很多文章中介紹都是這么說的, 個人覺得應該更詳細一點。
在spout發送的時候,函數原型
public List<Integer> emit(List<Object> tuple, Object messageId) {
return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
}
這里的tuple, 實際上是List<Object> 對象,返回的是 List<Integer> 是要發送的tast的IdsList
在bolt接收的時候, 函數原型
public void execute(Tuple tuple)
變成了一個Tuple對象, 結構應該也是一個list, List<Field1, value1, Field2, value2..>這樣的一個結構, FieldList ValueList, 我們根據對應的fieldname就可以取出對應的getIntegerByField方法
回到spout對象中來, 在spout有一個定義的輸出字段
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
這里定義的一個字段,所以我們在emit的時候就只能發送一個包含一個value的tuple(spout部分), storm會將field, 和 發送的value下標對應, 變成一個Tuple對象, 也就是上面說的
List<Field1, value1, Field2, value2..>這樣的一個結構, 在bolt 之間傳遞tuple, 發送又是List<Object> tuple, 根據組裝bolt定義的fiels, 再組合成Tuple對象給下一個Bolt處理
在發射的最后 還有一個 void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId); 因為上面emit的時候已經返回List<taskid>, 所以它就知道要發送給哪些taskid處理,然后將taskid 和 tuple放入隊列LinkedBlockingQueue
, 代碼如下
; worker.clj
(
defn
mk-transfer-
fn
[
transfer-queue
]
(
fn
[
task ^Tuple tuple
]
(.put ^LinkedBlockingQueue
transfer-queue
[
task tuple
]
)
))
總結: 每次emit都是根據List<Object>和定義的輸出Fields組合成一個Tuple對象,,每個接受對象接收的是Tuple對象,如果處理完再發送又再組合字段, 在emit的時候返回LIst<taskids>,所以就知道發送給哪些Task, 然后拿這些taskid和tuple再組合成一個任務隊列,通過zeromq發送到目標task,目標task接收到tuple進程處理至於並發度控制, 參考
http://www.cnblogs.com/chengxin1982/p/4001275.html
TupleID Tuple對應的ID, 在創建的時候賦予一個64位的id,主要用來跟蹤消息
MsgID 官方解釋 Emits a new tuple to the default output stream with the given message ID. 如果不指定,acker不會跟蹤。主要作用 , 在spout收到fail時候, 能夠定位到是哪條消息出錯,能夠決定重發. 使用實例 _collector.emit(new Values(sentence), new Integer(num));
acker 消息跟蹤者. acker 存儲一個Map<taskid, ack val> , taskid為祖宗tuple創建者的taskid, ack_val 為消息傳遞過程中的 tupleid的xor值,如果為0則知道是哪個spout或者bolt已經處理完了, 為什么會有bolt, 因為bolt在發射的時候,如果非錨定,就是不帶tuple發射,它會被認為是祖宗tuple, 上一個tuple會認為已經結束.
至於分配發射源分配到acker, storm采用一致性hash 祖宗tupleid來分配,因為在所有的tuple中都能知道祖宗tupleid,所以在子孫tuple處理時, 知道該發送給哪個acker跟蹤