twitter storm源碼走讀之6 -- Trident Topology執行過程分析


歡迎轉載,轉載請注明出處,徽滬一郎。

TridentTopology是storm提供的高層使用接口,常見的一些SQL中的操作在tridenttopology提供的api中都有類似的影射。關於TridentTopology的使用及運行原理,當前進行詳細分析的文章不多。

從TridentTopology到vanilla topology(普通的topology)由三個層次組成:

  1. 面向最終用戶的概念stream, operation
  2. 利用planner將tridenttopology轉換成vanilla topology
  3. 執行vanilla topology

本文嘗試TridentTopology是如何先一步步轉換成普通的storm Topology(即vanila topology), 轉換后的topology的執行中有哪些區別?

 

概述

從TridentTopology到基本的Topology有三層,下圖給出一個全局的視圖。

創建TridentTopology

下面的代碼摘自StormStarter中的TridentWordCount.java

 

    TridentTopology topology = new TridentTopology();
    topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
        new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
        new Count(), new Fields("count")).parallelismHint(16);

    return topology.build();

 

上述代碼的newStream一行,分兩大部分,一是使用newStream來創建一個stream對象,然后針對該Stream進行各種操作,each/shuffle/persistentAggregate等就是各種operation.

用戶在使用TridentTopology的時候,只需要熟悉Stream和TridentTopology中的API函數即可。

轉換TridentTopology為Vanilla Topology

上一節創建了Stream,但是如何將其與原有的Spout及Bolt聯系起來呢?問題的關鍵就在TridentTopology::build函數和TridentTopologyBuilder::buildTopology

TridentTopology::build

newStream及其后的函數調用創建了一個含有三大類節點的List,利用該List創建了一個有向非循環圖(DAG)。這三類節點分別是operation, partition, spout,在build函數將節點分類分別加入到boltNodes或spoutNodes,注意此處的spout或bolt不能等同於普通的spout和bolt.

TridentTopologyBuilder::buildTopology

利用在build函數中創建的boltNodes,spoutNodes及生成的graph來創建vanilla topology所需要的bolt及spout.

在buildTopology中會看到類似的代碼片段。

builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));
for(String b: c.committerBatches) {
                specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
            }
            
            BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);

最終生成的普通Topology,與普通Topology中的Spout相對應的是MasterBatchCoordinator,而在創建TridentTopology使用的spout則成了Bolt,使用於Stream上的各種Operation也存在於多個普通Bolt中。

 TridentTopology的執行

TridentTopology被轉換為普通的Topology(vanilla Topology)之后提交到nimbus,它的具體執行過程有什么不同呢?

主要有幾點:

  1. MasterBatchCoordinator通過Batch_stream_id來發送通知給TridentSpoutExecutor
  2. TridentSpoutExecutor收到通知發送成批的tuple給下一跳的Bolt
  3. 下一跳的Bolt收到tuple之后,使用TridentBoltExecutor來進行處理
    1. TridentBoltExecutor調用SubtopologyBolt::execute
    2. InitialReceiver::execute被調用
    3. TridentProcessor::execute被調用

MasterBatchCoordinator收到ack之后,會發送success消息給Spout

MasterBatchCoordinator在commit的時候,會發送commit消息給Spout,讓Spout將緩存的消息刪除

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM