Trident是基於Storm進行實時留處理的高級抽象,提供了對實時流4的聚集,投影,過濾等操作,從而大大減少了開發Storm程序的工作量。Trident還提供了針對數據庫或則其他持久化存儲的有狀態的,增量的更新操作的原語。
若我們要開發一個對文本中的詞頻進行統計的程序,使用Storm框架的話我們需要開發三個Storm組件:
1.一個Spout負責收集文本信息並分段,做為sentence字段發送給下游的Bolt
2.一個Bolt將將每段文本粉刺,將分詞結果以word字段發送給下游的Bolt
3.一個Bolt對詞頻進行統計,把統計結果記錄在count字段並存儲
如果使用Trident我們可以使用一下代碼完成上述操作:
1 FixedBatchSpout spout = new FixedBatchSpout(new Fields("setence"),3, 2 new Values("the cow jump over the moon"), 3 new Values("the man went to the store and bought some candy"), 4 new Values("four score and seven years ago"), 5 new Values("how many apples can you eat")); 6 spout.setCycle(true); 7 TridentTopology topology = new TridentTopology(); 8 TridentState workcount = topology.newStream("spout",spout) 9 .each(new Fields("setence"),new Split(),new Fields("word")) 10 .groupBy(new Fields("word")) 11 .persistentAggregate(new MemoryMapState.Factory(),new Count(),new Fields("count")) 12 .parallelismHint(6);
上述這段代碼會被Trident框架轉為為使用Storm開發時的三個步驟
代碼的前兩行使用FixedBatchSpout不斷循環生成參數里列出的四個句子,第7行聲明了TridentTopology對象,並在第8行的newStream方法中引用了FixedBatchSpout。Trident是按批處理數據的,FixedBatchSpout生成的數據是按照下圖的方式一批一批的發送到下一個處理單元的,后續處理單元也是按照這種方式把數據發送到其他節點。
在上述的第9行使用Split對文本分詞,並發分詞結果存儲到Word字段中Split的定義如下:
1 public class Split extends BaseFunction { 2 3 @Override 4 public void execute(TridentTuple tuple, TridentCollector collector) { 5 for(String word: tuple.getString(0).split(" ")) { 6 if(word.length() > 0) { 7 collector.emit(new Values(word)); 8 } 9 } 10 } 11 12 }
在each方法宏也可以實現過濾,如只統計單詞長度超過10個字母長度的單詞的過濾可以定義如下:
.each(new Fields("word"), new BaseFilter() { public boolean isKeep(TridentTuple tuple) { return tuple.getString(0).length() > 10; } })
代碼gropuBy(new Fields("word"))對word進行聚集操作,並在其后使用Count對象進行計數。之后將得到的結果儲存到內存中。Trident不僅支持將結果存儲到內存中,也支持將結果存儲到其他的介質,如數據庫,Memcached。如要將最終結果以key-value的方式存儲到Memcached,可以使用下面的方式:
persistentAggregate(new Memcached.transactional(local),new Count(),new Fields("count"))
實時任務的關鍵問題是如何處理對數據更新的冪等問題,任務可能失敗或則重啟,因此更新操作可能被重復執行。以上述為例,發送到Count的數據可能因為節點的重啟或則網絡故障導致的其他原因致使被重復發送,從而引起數據的重復統計,為了避免這個問題Trident提供了事物支持,由於數據是按批發送到Count節點的,Trident對每批單詞都分配一個Transaction id。上面的代碼中,每完成一批單詞的統計,就將這批數據的統計結果連同Transaction id一起存儲到Memcached中。數據更新的時候,Trident會比較Memcached中的Transaction id和新到達數據的Transaction id,如果同一批數據被重復發送,其Transaction id就會等於Memcached存儲的Transaction id,新數據將會被忽略。另外每批數據的Transaction id是有嚴格的順序的Transaction id 為2的數據沒有處理完的情況下,絕對不會處理Transaction id為3的數據。
有時,一個任務有多個數據源,每一個數據源都是以TridentState的形式出現在任務定義中的,比如上面提到的wordcount任務生成的數據就可以被其他的任務所使用,可以使用stateQuery方法引用別的TridentState,stateQuery的定義如下:
Stream stateQuery(TridentState state, QueryFunction function, Fields functionFields)
Trident的數據模型稱作"TridentTuple"---帶名字的Values列表。在Topology中,tuple是在順序的操作集合中增量生成的。Operation通常包含一組輸入字段和提交的功能字段。Operation的輸入字段通常是將tuple中的一個子集作為操作集合的輸入,而功能字段則是命名提交的字段。
例如,聲明一個名為"students"的Stream,可能包含名字,性別,學號,分數等字段。添加一個按分數過濾的過濾器ScoreFilter,使得tuples只過濾分數大於60的學生,定義一個分數過濾器,當選擇輸入字段的時候Trident會自動過濾出一個子集,該操作十分的高效。
class ScoreFilter extends BaseFilter{ public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(0) >= 60; } }
如果我們相對字段進行計算,並且提交給TridentTuple,可以模擬一下計算。
class AddAndSubFuction extends BaseFunction{ public void execute(TridentTuple tuple, TridentCollector collector) { int res1 = tuple.getInteger(0); int res2 = tuple.getInteger(1); int sub = res1 > res2 ? res1 - res2 : res2 - res1; collector.emit(new Values(res1+res2,sub)); } }
此函數接收兩個整數作為參數,並計算兩個數的和以及差,作為兩個新的Fields提交。