Storm拓撲的並行度(parallelism)介紹
1、Storm分為3個主要實體,用於在Storm集群中運行拓撲
工作進程:Worker Process,也稱為Worker
執行器:Executor,即線程Thread
任務:Task
工作進程、執行器、任務三者之間關系如下圖:
Topology由一個或多個Spout/Bolt組件構成。
運行中的Topology由一個或多個Supervisor節點中的Worker構成。默認情況下一個Supervisor節點運行4個Worker,由defaults.yaml/storm.yaml中的屬性決定:
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
在代碼中可以使用new Config().setNumWorkers(3),最大數量不能超過配置的supervisor.slots.ports數量。
Worker為特定拓撲的一個或多個組件Spout/Bolt產生一個或多個Executor。默認情況下一個Worker運行一個Executor。
Executor為特定拓撲的一個或多個組件Spout/Bolt實例運行一個或多個Task。默認情況下一個Executor運行一個Task。
Task執行真正的數據處理,代碼中實現的每個Spout/bolt作為很多任務跨集群執行。一個Spout/Bolt組件的Task數量始終貫穿Topology的整個生命周期,但一個Spout/Bolt組件的Executor數量會隨着時間而改變。這意味着Threads≤Tasks條件成立。默認情況下Task數量與Executor數量相同,即Storm會使用每個Executor運行一個Task。
2、配置拓撲的並行度
工作進程Worker數量
Config config = new Config();
config.setNumWorkers(3); //注意此參數不能大於supervisor.slots.ports數量。
執行器Executor數量
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(id, spout, parallelism_hint); //設置Spout的Executor數量參數parallelism_hint
builder.setBolt(id, bolt, parallelism_hint); //設置Bolt的Executor數量參數parallelism_hint
任務Task數量
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(id, spout, parallelism_hint).setNumTasks(val);; //設置Spout的Executor數量參數parallelism_hint,Task數量參數val
builder.setBolt(id, bolt, parallelism_hint).setNumTasks(val); //設置Bolt的Executor數量參數parallelism_hint,Task數量參數val
3、改變運行中拓撲的並行度
Storm一個很好的特性是可以增加或減少工作進程Worker和Executor的數量而不需要重啟集群或拓撲,這樣的行為成為再平衡(rebalancing)。目前有兩種方式可實現拓撲再平衡,如下:
使用Storm的WebUI
使用Storm的命令行工具,如下
# 重新配置拓撲
# “myTopology” 拓撲使用5個Worker進程
# “blue-spout” Spout使用3個Executor
# “yellow-blot” Bolt使用10個Executor
storm rebalance myTopology -n 5 -e blue-spout=3 -e yellow-blot=10