歡迎轉載,轉載請注明出處,徽滬一郎。
TridentTopology是storm提供的高層使用接口,常見的一些SQL中的操作在tridenttopology提供的api中都有類似的影射。關於TridentTopology的使用及運行原理,當前進行詳細分析的文章不多。
從TridentTopology到vanilla topology(普通的topology)由三個層次組成:
- 面向最終用戶的概念stream, operation
- 利用planner將tridenttopology轉換成vanilla topology
- 執行vanilla topology
本文嘗試TridentTopology是如何先一步步轉換成普通的storm Topology(即vanila topology), 轉換后的topology的執行中有哪些區別?
概述
從TridentTopology到基本的Topology有三層,下圖給出一個全局的視圖。
創建TridentTopology
下面的代碼摘自StormStarter中的TridentWordCount.java
TridentTopology topology = new TridentTopology(); topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")).parallelismHint(16); return topology.build();
上述代碼的newStream一行,分兩大部分,一是使用newStream來創建一個stream對象,然后針對該Stream進行各種操作,each/shuffle/persistentAggregate等就是各種operation.
用戶在使用TridentTopology的時候,只需要熟悉Stream和TridentTopology中的API函數即可。
轉換TridentTopology為Vanilla Topology
上一節創建了Stream,但是如何將其與原有的Spout及Bolt聯系起來呢?問題的關鍵就在TridentTopology::build函數和TridentTopologyBuilder::buildTopology
TridentTopology::build
newStream及其后的函數調用創建了一個含有三大類節點的List,利用該List創建了一個有向非循環圖(DAG)。這三類節點分別是operation, partition, spout,在build函數將節點分類分別加入到boltNodes或spoutNodes,注意此處的spout或bolt不能等同於普通的spout和bolt.
TridentTopologyBuilder::buildTopology
利用在build函數中創建的boltNodes,spoutNodes及生成的graph來創建vanilla topology所需要的bolt及spout.
在buildTopology中會看到類似的代碼片段。
builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout)) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));
for(String b: c.committerBatches) { specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID); } BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);
最終生成的普通Topology,與普通Topology中的Spout相對應的是MasterBatchCoordinator,而在創建TridentTopology使用的spout則成了Bolt,使用於Stream上的各種Operation也存在於多個普通Bolt中。
TridentTopology的執行
TridentTopology被轉換為普通的Topology(vanilla Topology)之后提交到nimbus,它的具體執行過程有什么不同呢?
主要有幾點:
- MasterBatchCoordinator通過Batch_stream_id來發送通知給TridentSpoutExecutor
- TridentSpoutExecutor收到通知發送成批的tuple給下一跳的Bolt
- 下一跳的Bolt收到tuple之后,使用TridentBoltExecutor來進行處理
- TridentBoltExecutor調用SubtopologyBolt::execute
- InitialReceiver::execute被調用
- TridentProcessor::execute被調用
MasterBatchCoordinator收到ack之后,會發送success消息給Spout
MasterBatchCoordinator在commit的時候,會發送commit消息給Spout,讓Spout將緩存的消息刪除