流聚合(stream join)是指將具有共同元組(tuple)字段的數據流(兩個或者多個)聚合形成一個新的數據流的過程。
從定義上看,流聚合和SQL中表的聚合(table join)很像,但是二者有明顯的區別:table join的輸入是有限的,並且join的語義是非常明確的;而流聚合的語義是不明確的並且輸入流是無限的。
數據流的聚合類型跟具體的應用有關。一些應用把兩個流發出的所有的tuple都聚合起來——不管多長時間;而另外一些應用則只會聚合一些特定的tuple。而另外一些應用的聚合邏輯又可能完全不一樣。而這些聚合類型里面最常見的類型是把所有的輸入流進行一樣的划分,這個在storm里面用fields grouping在相同字段上進行grouping就可以實現。
下面是對storm-starter(代碼見:https://github.com/nathanmarz/storm-starter)中有關兩個流的聚合的示例代碼剖析:
先看一下入口類SingleJoinExample。
(1)這里首先創建了兩個發射源spout,分別是genderSpout和ageSpout:
FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender")); FeederSpout ageSpout = new FeederSpout(new Fields("id", "age")); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("gender", genderSpout); builder.setSpout("age", ageSpout);
其中genderSpout包含兩個tuple字段:id和gender,ageSpout包含兩個tuple字段:id和age(這里流聚合就是通過將相同id的tuple進行聚合,得到一個新的輸出流,包含id、gender和age字段)。
(2)為了不同的數據流中的同一個id的tuple能夠落到同一個task中進行處理,這里使用了storm中的fileds grouping在id字段上進行分組划分:
builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))) .fieldsGrouping("gender", new Fields("id")) .fieldsGrouping("age", new Fields("id"));
從中可以看到,SingleJoinBolt就是真正進行流聚合的地方。下面我們來看看:
(1)SingleJoinBolt構造時接收一個Fileds對象,其中傳進的是聚合后將要被輸出的字段(這里就是gender和age字段),保存到變量_outFileds中。
(2)接下來看看完成SingleJoinBolt的構造后,SingleJoinBolt在真正開始接收處理tuple之前所做的准備工作(代碼見prepare方法):
a)首先,將保存OutputCollector對象,創建TimeCacheMap對象,設置超時回調接口,用於tuple處理失敗時fail消息;緊接着記錄數據源的個數:
_collector = collector; int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback()); _numSources = context.getThisSources().size();
b)遍歷TopologyContext中不同數據源,得到所有數據源(這里就是genderSpout和ageSpout)中公共的Filed字段,保存到變量_idFields中(例子中就是id字段),同時將_outFileds中字段所在數據源記錄下來,保存到一張HashMap中_fieldLocations,以便聚合后獲取對應的字段值。
Set<String> idFields = null; for(GlobalStreamId source: context.getThisSources().keySet()) { Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId()); Set<String> setFields = new HashSet<String>(fields.toList()); if(idFields==null) idFields = setFields; else idFields.retainAll(setFields); for(String outfield: _outFields) { for(String sourcefield: fields) { if(outfield.equals(sourcefield)) { _fieldLocations.put(outfield, source); } } } } _idFields = new Fields(new ArrayList<String>(idFields)); if(_fieldLocations.size()!=_outFields.size()) { throw new RuntimeException("Cannot find all outfields among sources"); }
(3)好了,下面開始兩個spout流的聚合過程了(代碼見execute方法):
首先,從tuple中獲取_idFields字段,如果不存在於等待被處理的隊列_pending中,則加入一行,其中key是獲取到的_idFields字段,value是一個空的HashMap<GlobalStreamId, Tuple>對象,記錄GlobalStreamId到Tuple的映射。
List<Object> id = tuple.select(_idFields); GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()); if(!_pending.containsKey(id)) { _pending.put(id, new HashMap<GlobalStreamId, Tuple>()); }
從_pending隊列中,獲取當前GlobalStreamId streamId對應的HashMap對象parts中:
Map<GlobalStreamId, Tuple> parts = _pending.get(id);
如果streamId已經包含其中,則拋出異常,接收到同一個spout中的兩條一樣id的tuple,否則將該streamid加入parts中:
if(parts.containsKey(streamId)) throw new RuntimeException("Received same side of single join twice"); parts.put(streamId, tuple);
如果parts已經包含了聚合數據源的個數_numSources時,從_pending隊列中移除這條記錄,然后開始構造聚合后的結果字段:依次遍歷_outFields中各個字段,從_fieldLocations中取到這些outFiled字段對應的GlobalStreamId,緊接着從parts中取出GlobalStreamId對應的outFiled,放入聚合后的結果中。
if(parts.size()==_numSources) { _pending.remove(id); List<Object> joinResult = new ArrayList<Object>(); for(String outField: _outFields) { GlobalStreamId loc = _fieldLocations.get(outField); joinResult.add(parts.get(loc).getValueByField(outField)); }
最后通過_collector將parts中存放的tuple和聚合后的輸出結果發射出去,並ack這些tuple已經處理成功。
_collector.emit(new ArrayList<Tuple>(parts.values()), joinResult); for(Tuple part: parts.values()) { _collector.ack(part); }
}
否則,繼續等待兩個spout流中這個streamid都到齊后再進行聚合處理。
(4)最后,聲明一下輸出字段(代碼見declareOutputFields方法):
declarer.declare(_outFields);