Storm+HBase實時實踐


1.HBase Increment計數器

 hbase counter的原理: read+count+write,正好完成,就是講key的value讀出,若存在,則完成累加,再寫入,若不存在,則按“0”處理,再加上你需要累加的值。

  傳統上,如果沒有 counter,當我們要給一個 column 的值 +1 或者其他數值時,就需要先從該 column 讀取值,然后在客戶端修改值,最后寫回給 Region Server,即一個 Read-Modify-Write (RMW) 操作。在這樣的過程中,按照 Lars 的描述1,還需要對操作所在的 row 事先加鎖,事后解鎖。會引起許多 contention,以及隨之而來很多問題。而 HBase 的 increment 接口就保證在 Region Server 端原子性的完成一個客戶端請求。

   RMW 操作的代碼:

db.read (table,keyname,fields, new HashMap < String,String > ( ) ) ;
db.update (table,keyname,values ) ;

  它並沒有對所操作的 row 進行加鎖、解鎖操作,而是簡單的讀取改寫。這在 counter 的應用場景中是不可接受的。不加鎖在大並發情況下,很容易導致 counter 的值與預期不符。

  HBase 引入 Increment/Counter 是非常重要的,對某些需要原子性更改操作的應用來說則是“致命”的。除了單個 increment 的接口 incrementColumnValue() 外,還有批量 increment 的接口increment(Increment),方便客戶端調用。

  除此之外,HBase 還在進行 Coprocessor 的開發,使計算直接在 Region Server 上進行,省去了繁瑣耗時的數據移動。

使用方法:

long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,long amount) throws IOException

 

2.Hbase讀寫

 

3.Storm架構

  http://shiyanjun.cn/archives/977.html

每一個工作進程執行一個topology的一個子集;一個運行的topology由運行在很多機器上的很多工作進程組成。

Topology: 是個應用的spout,bolt,grouping的組合

Storm Grouping:

  1. Shuffle Grouping :隨機分組,盡量均勻分布到下游Bolt中

    將流分組定義為混排。這種混排分組意味着來自Spout的輸入將混排,或隨機分發給此Bolt中的任務。shuffle grouping對各個task的tuple分配的比較均勻。

  2. Fields Grouping :按字段分組,按數據中field值進行分組;相同field值的Tuple被發送到相同的Task

    這種grouping機制保證相同field值的tuple會去同一個task,這對於WordCount來說非常關鍵,如果同一個單詞不去同一個task,那么統計出來的單詞次數就不對了。“if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always go to the same task”. —— 小示例

  3. All grouping :廣播

    廣播發送, 對於每一個tuple將會復制到每一個bolt中處理。

  4. Global grouping :全局分組,Tuple被分配到一個Bolt中的一個Task,實現事務性的Topology。

    Stream中的所有的tuple都會發送給同一個bolt任務處理,所有的tuple將會發送給擁有最小task_id的bolt任務處理。

  5. None grouping :不分組

    不關注並行處理負載均衡策略時使用該方式,目前等同於shuffle grouping,另外storm將會把bolt任務和他的上游提供數據的任務安排在同一個線程下

  6. Direct grouping :直接分組 指定分組

    由tuple的發射單元直接決定tuple將發射給那個bolt,一般情況下是由接收tuple的bolt決定接收哪個bolt發射的Tuple。這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪個task處理這個消息。 只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。消息處理者可以通過TopologyContext來獲取處理它的消息的taskid (OutputCollector.emit方法也會返回taskid)

Storm Tuple

  storm中的數據首先是有spout收集,類似於一個消息源,spout的open()函數一般就是接收數據的地方,然后spout的 nextTuple()是發送(emit)tuple的地方。tuple到底是什么?感覺還是用英語來說比較容易理解吧,"A tuple is a named of values where each value can be any type."  tuple是一個類似於列表的東西,存儲的每個元素叫做field(字段)。我們用getString(i)可以獲得tuple的第i個字段。而其中的每個字段都可以任意類型的,也可以一個很長的字符串。我們可以用:

  1. String A = tuple.getString(0);  
  2. long a= tuple.getLong(1);  

 來得到我想要的數據,不過前提你是要知道你的tuple的組成。具體tuple是什么類型,完全取決於自己的程序,取決於spout中nextTuple()方法中emit發送的類型。

 

4.kafka+storm+hbase架構

kafka作為分布式消息系統,實時消息系統,有生產者和消費者;storm作為大數據的實時處理系統;hbase是apache hadoop 的數據庫,其具有高效的讀寫性能,把kafka生產的數據作為storm的源頭spout來消費,經過bolt處理把結果保存到hbase,進行持久化保存.

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM