Storm概念學習系列之Tuple元組(數據載體)


 

  不多說,直接上干貨!

 

 

Tuple元組

  Tuple 是 Storm 的主要數據結構,並且是 Storm 中使用的最基本單元、數據模型和元組。

 

      

 

 

 

 

 

 

Tuple 描述

  Tuple 就是一個值列表, Tuple 中的值可以是任何類型的,動態類型的Tuple的fields可以不用聲明;默認情況下,Storm中的Tuple支持私有類型、字符串、字節數組等作為它的字段值,如果使用其他類型,就需要序列化該類型

  Tuple的字段默認類型有 :  integer、 float、 double、 long、short、 string、 byte、 binary(byte[])

   

  Tuple元組,是消息傳遞的基本單元,是一個命名的值列表,元組中的字段可以是任何類型的對象。Storm使用元組作為其數據模型,元組支持所有的基本類型、字符串和字節數組作為字段值,只要實現類型的序列化接口就可以使用該類型的對象。

  元組本來應該是一個key-value的Map,但是由於各個組件間傳遞的元組的字段名稱已經事先定義好,所以只要按序把元組填入各個value即可,所以元組是一個vlue的List

 

 

  Tuple是Storm采用的數據表示模型,所有的數據都以Tuple的形式在各個組件之間流動。Tuple是一組字段列表,每個字段由一個字段名和字段值組成,每個Tuple類似於數據庫中的一行記錄。在默認的情況下,Tuple的字段類型可以是integer、long、short、byte、string、double、float、boolean和byte array。當然,你也可以通過實現序列化器自定義類型。

 

 


  Tuple 數據結構如圖 1 所示。

                        

                             圖 1 Tuple 數據結構

 

   Tuple 可以理解成鍵值對。例如,創建一個Bolt 要發送兩個字段(命名為 double 和 triple),其中鍵就是定義在declareOutputFields 方法中的 Fields 對象,值就是在 emit 方法中發送的 Values 對象。

 

 

 

  以下是一個簡單例子

public class DoubleAndTripleBolt extends BaseRichBolt {
OutputCollectorBase _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) { _collector = collector; }
@Override
public void execute(Tuple input) { int val = input.getInteger(0); _collector.emit(input, new Values(val*2, val*3)); _collector.ack(input); }
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("double", "triple")); }}

 

 

  此外,在使用的 Storm Java 包中, backtype.storm.tuple 主要有以下幾個類:

Fileds.class
MessageId.class
Tuple.class
TupleImpl.class
Values.class

  列出以上內容是為了更好地理解 Tuple,這樣能夠從本質上理解 Tuple,在使用時更加得心應手。

 

 

 

 

Tuple 的生命周期

  了解一個 Tuple 的生命周期就需要查看源碼,如下的 Java 代碼展示了 Spout(消息源)接口發出 Tuple(消息)的整個過程。

public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
void close();
}

  首 先, Storm 調 用 Spout(消息源)的nextTuple 方法來獲取下一個Tuple, Spout通過Open 方法的參數提供的SpoutOutputCollector將新Tuple發射到其中一個輸出消息流

    注意:發射Tuple 時, Spout提供一個message-id,通過這個ID 來追蹤該Tuple。

   接下來, Storm跟蹤該Tuple的樹形結構是否成功創建,並根據 messageid調用Spout中的ack函數,以確認Tuple是否被完全處理。如果Tuple超時,則調用 Spout 的 fail 方法。

  由此看出,同一個Tuple不管是acked,還是failed都是由創建它的Spout發出並維護的,所以,即使Spout 在集群環境中同時執行很多的任務,該Tuple 也不會被其他任務調用或生成 acked或 failed 狀態。總之, Storm會利用內部的 Acker 機制保證每個Tuple 被可靠地處理。最后,在任務完成后,Spout調用Close方法結束 Tuple 的使命。

 

  比如

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM