Apache Storm技術實戰之3 -- TridentWordCount


歡迎轉載,轉載請注明出處。

介紹TridentTopology的使用,重點分析newDRPCStream和stateQuery的實現機理。

使用TridentTopology進行數據處理的時候,經常會使用State來保存一些狀態,這些保存起來的State通過stateQuery來進行查詢。問題恰恰在這里產生,即對state進行更新的Stream和爾后進行stateQuery的Stream並非同一個,那么它們之間是如何關聯起來的呢。

在TridentTopology中,有一些Processor可能會同處於一個Bolt中,這些Processor形成一個processing chain, 那么Tuple又是如何在這些Processor之間進行傳遞的呢。

TridentWordCount

編譯和運行

lein compile storm.starter.trident.TridentWordCount
java -cp $(lein classpath) storm.starter.trident.TridentWordCount 

main函數

public static void main(String[] args) throws Exception {
    Config conf = new Config();
    conf.setMaxSpoutPending(20);
    if (args.length == 0) {
      LocalDRPC drpc = new LocalDRPC();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
      for (int i = 0; i < 100; i++) {
        System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
        Thread.sleep(1000);
      }
    }
    else {
      conf.setNumWorkers(3);
      StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
    }
  } 

buildTopology

 public static StormTopology buildTopology(LocalDRPC drpc) {
    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
        new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
        new Values("how many apples can you eat"), new Values("to be or not to be the person"));
    spout.setCycle(true);

    TridentTopology topology = new TridentTopology();
    TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
        new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
        new Count(), new Fields("count")).parallelismHint(16);

    topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields(
        "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"),
        new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));
    return topology.build();
  } 

示意圖

 

在整個topology中,有兩個不同的spout。

運行結果該如何理解

此圖有好幾個問題

  1. PartitionPersistProcessor和StateQueryProcessor同處於一個bolt,該bolt為SubtopologyBolt
  2. SubtopologyBolt有來自多個不同Stream的輸入,根據不同的Streamid找到對應的InitialReceiver
  3. drpcspout在執行的時候,是一直不停的emit消息到SubtopologyBolt,還是發送完一次消息就停止發送

不同的tuple,其sourcestream不一樣,根據SourceStream,找到對應的InitialReceiver

    Map<String, InitialReceiver> _roots = new HashMap(); 

 

狀態更新

進行狀態更新的Processor名為PartitionPersistProcessor

execute

記錄哪些tuple需要進行狀態更新

finishBatch

狀態真正更新是發生在finishBatch階段

persistentAggregate

PartitionPersistProcessor

  • SubtopologyBolt::execute
    • PartitionPersistProcessor::finishBatch
      •   _updater::updateState
        • Snapshottable::update

當狀態更新的時候,狀態查詢是否會發生?

狀態查詢

進行狀態查詢的Processor名為StateQueryProcessor

execute

finishBatch

查詢的時候,首先調用batchRetrieive來獲得最新的狀態更新結果,再對每個最新的結果使用_function來進行處理。

調用層次

  • SubtopologyBolt::finishBatch
    •   StateQueryProcessor::finishBatch
      • _function.batchRetrieve
      • _function.execute   將處理過的結果發送給下一跳進行處理

         

消息的傳遞

TridentTuple

如何決定bolt內部的哪個processor來處理接收到的消息,這個是根據不同的Stream來判斷InitialReceiver完成。

當SubtopologyBolt接收到最原始的tuple時,根據streamid找到InitialReceiver后,InitialReceiver在receive函數中作的第一件事情就是根據tuple來創建一個tridenttuple,tridenttuple會被處在同一個SubtopologyBolt中的processor一一處理,處理的結果是保存在tridenttuple和processorcontext中。

ProcessorContext

ProcessorContext記錄兩個重要的信息,即當前的batchId和batchState.

public class ProcessorContext {
    public Object batchId;
    public Object[] state;
    
    public ProcessorContext(Object batchId, Object[] state) {
        this.batchId = batchId;
        this.state = state;
    }
}

TridentCollector

tridentcollector在emit的時候將消息由各個TupleReceiver進行處理。目前僅有BridgeReceiver實現了該接口。

BridgeReceiver負責將消息發送給另外的Bolt進行處理。這里說的“另外的Bolt”是指Vanilla Topology中的Bolt.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM