Implementing Real-Time Trending Topics With a Distributed Rolling Count Algorithm in Storm, 圖文並茂, 早看到就直接翻譯這篇了...
計算top N words的topology, 用於比如trending topics or trending images on Twitter.
實現了滑動窗口計數和TopN排序, 比較有意思, 具體分析一下代碼
Topology
這是一個稍微復雜些的topology, 主要體現在使用不同的grouping方式, fieldsGrouping和globalGrouping
String spoutId = "wordGenerator"; String counterId = "counter"; String intermediateRankerId = "intermediateRanker"; String totalRankerId = "finalRanker"; builder.setSpout(spoutId, new TestWordSpout(), 5); builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj")); builder.setBolt(totalRankerId, new TotalRankingsBolt TOP_N)).globalGrouping(intermediateRankerId);
RollingCountBolt
首先使用RollingCountBolt, 並且此處是按照word進行fieldsGrouping的, 所以相同的word會被發送到同一個bolt, 這個field id是在上一級的declareOutputFields時指定的
RollingCountBolt, 用於基於時間窗口的counting, 所以需要兩個參數, the length of the sliding window in seconds和the emit frequency in seconds
new RollingCountBolt(9, 3), 意味着output the latest 9 minutes sliding window every 3 minutes
1. 創建SlidingWindowCounter(SlidingWindowCounter和SlotBasedCounter參考下面)
counter = new SlidingWindowCounter(this.windowLengthInSeconds / this.windowUpdateFrequencyInSeconds);
如何定義slot數? 對於9 min的時間窗口, 每3 min emit一次數據, 那么就需要9/3=3個slot
那么在3 min以內, 不停的調用countObjAndAck(tuple)來遞增所有對象該slot上的計數
每3分鍾會觸發調用emitCurrentWindowCounts, 用於滑動窗口(通過getCountsThenAdvanceWindow), 並emit (Map<obj, 窗口內的計數和>, 實際使用時間)
因為實際emit觸發時間, 不可能剛好是3 min, 會有誤差, 所以需要給出實際使用時間
2. TupleHelpers.isTickTuple(tuple), TickTuple
前面沒有說的一點是, 如何觸發emit? 這是比較值得說明的一點, 因為其使用Storm的TickTuple特性.
這個功能挺有用, 比如數據庫批量存儲, 或者這里的時間窗口的統計等應用
"__system" component會定時往task發送 "__tick" stream的tuple
發送頻率由TOPOLOGY_TICK_TUPLE_FREQ_SECS來配置, 可以在default.ymal里面配置
也可以在代碼里面通過getComponentConfiguration()來進行配置,
public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); return conf;
配置完成后, storm就會定期的往task發送ticktuple
只需要通過isTickTuple來判斷是否為tickTuple, 就可以完成定時觸發的功能
public static boolean isTickTuple(Tuple tuple) { return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) \\ SYSTEM_COMPONENT_ID == "__system" && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); \\ SYSTEM_TICK_STREAM_ID == "__tick" }
最終, 這個blot的輸出為, collector.emit(new Values(obj, count, actualWindowLengthInSeconds));
obj, count(窗口內的計數和), 實際使用時間
SlotBasedCounter
基於slot的counter, 模板類, 可以指定被計數對象的類型T
這個類其實很簡單, 實現計數對象和一組slot(用long數組實現)的map, 並可以對任意slot做increment或reset等操作
關鍵結構為Map<T, long[]> objToCounts, 為每個obj都對應於一個大小為numSlots的long數組, 所以對每個obj可以計numSlots個數
incrementCount, 遞增某個obj的某個slot, 如果是第一次需要創建counts數組
getCount, getCounts, 獲取某obj的某slot值, 或某obj的所有slot值的和
wipeSlot, resetSlotCountToZero, reset所有對象的某solt為0, reset某obj的某slot為0
wipeZeros, 刪除所有total count為0的obj, 以釋放空間
public final class SlotBasedCounter<T> implements Serializable { private static final long serialVersionUID = 4858185737378394432L; private final Map<T, long[]> objToCounts = new HashMap<T, long[]>(); private final int numSlots; public SlotBasedCounter(int numSlots) { if (numSlots <= 0) { throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots + ")"); } this.numSlots = numSlots; } public void incrementCount(T obj, int slot) { long[] counts = objToCounts.get(obj); if (counts == null) { counts = new long[this.numSlots]; objToCounts.put(obj, counts); } counts[slot]++; } public long getCount(T obj, int slot) { long[] counts = objToCounts.get(obj); if (counts == null) { return 0; } else { return counts[slot]; } } public Map<T, Long> getCounts() { Map<T, Long> result = new HashMap<T, Long>(); for (T obj : objToCounts.keySet()) { result.put(obj, computeTotalCount(obj)); } return result; } private long computeTotalCount(T obj) { long[] curr = objToCounts.get(obj); long total = 0; for (long l : curr) { total += l; } return total; } /** * Reset the slot count of any tracked objects to zero for the given slot. * * @param slot */ public void wipeSlot(int slot) { for (T obj : objToCounts.keySet()) { resetSlotCountToZero(obj, slot); } } private void resetSlotCountToZero(T obj, int slot) { long[] counts = objToCounts.get(obj); counts[slot] = 0; } private boolean shouldBeRemovedFromCounter(T obj) { return computeTotalCount(obj) == 0; } /** * Remove any object from the counter whose total count is zero (to free up memory). */ public void wipeZeros() { Set<T> objToBeRemoved = new HashSet<T>(); for (T obj : objToCounts.keySet()) { if (shouldBeRemovedFromCounter(obj)) { objToBeRemoved.add(obj); } } for (T obj : objToBeRemoved) { objToCounts.remove(obj); } } }
SlidingWindowCounter
SlidingWindowCounter只是對SlotBasedCounter做了進一步的封裝, 通過headSlot和tailSlot提供sliding window的概念
incrementCount, 只能對headSlot進行increment, 其他slot作為窗口中的歷史數據
核心的操作為, getCountsThenAdvanceWindow
1. 取出Map<T, Long> counts, 對象和窗口內所有slots求和值的map
2. 調用wipeZeros, 刪除已經不被使用的obj, 釋放空間
3. 最重要的一步, 清除tailSlot, 並advanceHead, 以實現滑動窗口
advanceHead的實現, 如何在數組實現循環的滑動窗口
public final class SlidingWindowCounter<T> implements Serializable { private static final long serialVersionUID = -2645063988768785810L; private SlotBasedCounter<T> objCounter; private int headSlot; private int tailSlot; private int windowLengthInSlots; public SlidingWindowCounter(int windowLengthInSlots) { if (windowLengthInSlots < 2) { throw new IllegalArgumentException("Window length in slots must be at least two (you requested " + windowLengthInSlots + ")"); } this.windowLengthInSlots = windowLengthInSlots; this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots); this.headSlot = 0; this.tailSlot = slotAfter(headSlot); } public void incrementCount(T obj) { objCounter.incrementCount(obj, headSlot); } /** * Return the current (total) counts of all tracked objects, then advance the window. * * Whenever this method is called, we consider the counts of the current sliding window to be available to and * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent * objects within the next "chunk" of the sliding window. * * @return */ public Map<T, Long> getCountsThenAdvanceWindow() { Map<T, Long> counts = objCounter.getCounts(); objCounter.wipeZeros(); objCounter.wipeSlot(tailSlot); advanceHead(); return counts; } private void advanceHead() { headSlot = tailSlot; tailSlot = slotAfter(tailSlot); } private int slotAfter(int slot) { return (slot + 1) % windowLengthInSlots; } }
IntermediateRankingsBolt
這個bolt作用就是對於中間結果的排序, 為什么要增加這步, 應為數據量比較大, 如果直接全放到一個節點上排序, 會負載太重
所以先通過IntermediateRankingsBolt, 過濾掉一些
這里仍然使用, 對於obj進行fieldsGrouping, 保證對於同一個obj, 不同時間段emit的統計數據會被發送到同一個task
IntermediateRankingsBolt繼承自AbstractRankerBolt(參考下面)
並實現了updateRankingsWithTuple,
void updateRankingsWithTuple(Tuple tuple) { Rankable rankable = RankableObjectWithFields.from(tuple); super.getRankings().updateWith(rankable); }
邏輯很簡單, 將Tuple轉化Rankable, 並更新Rankings列表
參考AbstractRankerBolt, 該bolt會定時將Ranking列表emit出去
Rankable
Rankable除了繼承Comparable接口, 還增加getObject()和getCount()接口
public interface Rankable extends Comparable<Rankable> { Object getObject(); long getCount(); }
RankableObjectWithFields
RankableObjectWithFields實現Rankable接口
1. 提供將Tuple轉化為RankableObject
Tuple由若干field組成, 第一個field作為obj, 第二個field作為count, 其余的都放到List<Object> otherFields中
2. 實現Rankable定義的getObject()和getCount()接口
3. 實現Comparable接口, 包含compareTo, equals
public class RankableObjectWithFields implements Rankable
public static RankableObjectWithFields from(Tuple tuple) { List<Object> otherFields = Lists.newArrayList(tuple.getValues()); Object obj = otherFields.remove(0); Long count = (Long) otherFields.remove(0); return new RankableObjectWithFields(obj, count, otherFields.toArray()); }
Rankings
Rankings維護需要排序的List, 並提供對List相應的操作
核心的數據結構如下, 用來存儲rankable對象的list
List<Rankable> rankedItems = Lists.newArrayList();
提供一些簡單的操作, 比如設置maxsize(list size), getRankings(返回rankedItems, 排序列表)
核心的操作是,
public void updateWith(Rankable r) { addOrReplace(r); rerank(); shrinkRankingsIfNeeded(); }
1. 替換已有的, 或新增rankable對象(包含obj, count)
2. 從新排序(Collections.sort)
3. 由於只需要topN, 所以大於maxsize的需要刪除
AbstractRankerBolt
首先以TopN為參數, 創建Rankings對象
private final Rankings rankings; public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) { count = topN; this.emitFrequencyInSeconds = emitFrequencyInSeconds; rankings = new Rankings(count); }
在execute中, 也是定時觸發emit, 同樣是通過emitFrequencyInSeconds來配置tickTuple
一般情況, 只是使用updateRankingsWithTuple不斷更新Rankings
這里updateRankingsWithTuple是abstract函數, 需要子類重寫具體的update邏輯
public final void execute(Tuple tuple, BasicOutputCollector collector) { if (TupleHelpers.isTickTuple(tuple)) { emitRankings(collector); } else { updateRankingsWithTuple(tuple); } }
private void emitRankings(BasicOutputCollector collector) { collector.emit(new Values(rankings)); getLogger().info("Rankings: " + rankings); }
TotalRankingsBolt
該bolt會使用globalGrouping, 意味着所有的數據都會被發送到同一個task進行最終的排序.
TotalRankingsBolt同樣繼承自AbstractRankerBolt
void updateRankingsWithTuple(Tuple tuple) { Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0); super.getRankings().updateWith(rankingsToBeMerged); }
最終可以得到, 全局的TopN的Rankings列表