不多說,直接上干貨!
Tuple元組
Tuple 是 Storm 的主要數據結構,並且是 Storm 中使用的最基本單元、數據模型和元組。
Tuple 描述
Tuple 就是一個值列表, Tuple 中的值可以是任何類型的,動態類型的Tuple的fields可以不用聲明;默認情況下,Storm中的Tuple支持私有類型、字符串、字節數組等作為它的字段值,如果使用其他類型,就需要序列化該類型。
Tuple的字段默認類型有 : integer、 float、 double、 long、short、 string、 byte、 binary(byte[])。
Tuple元組,是消息傳遞的基本單元,是一個命名的值列表,元組中的字段可以是任何類型的對象。Storm使用元組作為其數據模型,元組支持所有的基本類型、字符串和字節數組作為字段值,只要實現類型的序列化接口就可以使用該類型的對象。
元組本來應該是一個key-value的Map,但是由於各個組件間傳遞的元組的字段名稱已經事先定義好,所以只要按序把元組填入各個value即可,所以元組是一個vlue的List。
Tuple是Storm采用的數據表示模型,所有的數據都以Tuple的形式在各個組件之間流動。Tuple是一組字段列表,每個字段由一個字段名和字段值組成,每個Tuple類似於數據庫中的一行記錄。在默認的情況下,Tuple的字段類型可以是integer、long、short、byte、string、double、float、boolean和byte array。當然,你也可以通過實現序列化器自定義類型。
Tuple 數據結構如圖 1 所示。
圖 1 Tuple 數據結構
Tuple 可以理解成鍵值對。例如,創建一個Bolt 要發送兩個字段(命名為 double 和 triple),其中鍵就是定義在declareOutputFields 方法中的 Fields 對象,值就是在 emit 方法中發送的 Values 對象。
以下是一個簡單例子
public class DoubleAndTripleBolt extends BaseRichBolt { OutputCollectorBase _collector;
@Override public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) { _collector = collector; }
@Override public void execute(Tuple input) { int val = input.getInteger(0); _collector.emit(input, new Values(val*2, val*3)); _collector.ack(input); }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("double", "triple")); }}
此外,在使用的 Storm Java 包中, backtype.storm.tuple 主要有以下幾個類:
Fileds.class MessageId.class Tuple.class TupleImpl.class Values.class
列出以上內容是為了更好地理解 Tuple,這樣能夠從本質上理解 Tuple,在使用時更加得心應手。
Tuple 的生命周期
了解一個 Tuple 的生命周期就需要查看源碼,如下的 Java 代碼展示了 Spout(消息源)接口發出 Tuple(消息)的整個過程。
public interface ISpout extends Serializable { void open(Map conf, TopologyContext context, SpoutOutputCollector collector); void nextTuple(); void ack(Object msgId); void fail(Object msgId); void close(); }
首 先, Storm 調 用 Spout(消息源)的nextTuple 方法來獲取下一個Tuple, Spout通過Open 方法的參數提供的SpoutOutputCollector將新Tuple發射到其中一個輸出消息流。
注意:發射Tuple 時, Spout提供一個message-id,通過這個ID 來追蹤該Tuple。
接下來, Storm跟蹤該Tuple的樹形結構是否成功創建,並根據 messageid調用Spout中的ack函數,以確認Tuple是否被完全處理。如果Tuple超時,則調用 Spout 的 fail 方法。
由此看出,同一個Tuple不管是acked,還是failed都是由創建它的Spout發出並維護的,所以,即使Spout 在集群環境中同時執行很多的任務,該Tuple 也不會被其他任務調用或生成 acked或 failed 狀態。總之, Storm會利用內部的 Acker 機制保證每個Tuple 被可靠地處理。最后,在任務完成后,Spout調用Close方法結束 Tuple 的使命。
比如