1、默認情況下:
1個supervisor節點啟動4個worker進程。
每一個topology默認占用一個worker進程。
每個worker會啟動executor。
每個executor默認啟動一個task。
2、並行度
什么是並行度?在 Storm 的設定里,並行度大體分為3個方面:
- 一個 topology 指定多少個 worker 進程並行運行;
- 一個 worker 進程指定多少個 executor 線程並行運行;
- 一個 executor 線程指定多少個 task 並行運行。
一般來說,並行度設置越高,topology 運行的效率就越高,但是也不能一股腦地給出一個很高的值,還得考慮給每個 worker 分配的內存的大小,還得平衡系統的硬件資源,以避免浪費。
Storm 集群可以運行一個或多個 topology,而每個 topology 包含一個或多個 worker 進程,每個 worer 進程可派生一個或多個 executor 線程,而每個 executor 線程則派生一個或多個 task,task 是實際的數據處理單元,也是 Storm 概念里最小的工作單元, spout 或 bolt 的實例便是由 task 承載。
3、ACK/Fail
上文說到,Storm 保證了數據不會丟失,ack/fail 機制便是實現此機制的法寶。Storm 在內部構建了一個 tuple tree 來表示每一個 tuple 的流向,當一個 tuple 被 spout 發射給下游 bolt 時,默認會帶上一個 messageId,可以由代碼指定但默認是自動生成的,當下游的 bolt 成功處理 tuple 后,會通過 acker 進程通知 spout 調用 ack 方法,當處理超時或處理失敗,則會調用 fail 方法。當 fail 方法被調用,消息可能被重發,具體取決於重發策略的配置,和所使用的 spout。
對於一個消息,Storm 提出了『完全處理』的概念。即一個消息是否被完全處理,取決於這個消息是否被 tuple tree 里的每一個 bolt 完全處理,當 tuple tree 中的所有 bolt 都完全處理了這條消息后,才會通知 acker 進程並調用該消息的原始發射 spout 的 ack 方法,否則會調用 fail 方法。
ack/fail 只能由創建該 tuple 的 task 所承載的 spout 觸發
默認情況下,Storm 會在每個 worker 進程里面啟動1個 acker 線程,以為 spout/bolt 提供 ack/fail 服務,該線程通常不太耗費資源,因此也無須配置過多,大多數情況下1個就足夠了。
4、work通信
一個 worker 進程裝配了如下幾個元件:
- 一個 receive 線程,該線程維護了一個 ArrayList,負責接收其他 worker 的 sent 線程發送過來的數據,並將數據存儲到 ArrayList 中。數據首先存入 receive 線程的一個緩沖區,可通過 topology.receiver.buffer.size (此項配置在 Storm 1.0 版本以后被刪除了)來配置該緩沖區存儲消息的最大數量,默認為8(個數,並且得是2的倍數),然后才被推送到 ArrayList 中。receive 線程接收數據,是通過監聽 TCP的端口,該端口有 storm 配置文件中 supervisor.slots.prots 來配置,比如 6700;
- 一個 sent 線程,該線程維護了一個消息隊列,負責將隊里中的消息發送給其他 worker 的 receive 線程。同樣具有緩沖區,可通過 topology.transfer.buffer.size 來配置緩沖區存儲消息的最大數量,默認為1024(個數,並且得是2的倍數)。當消息達到此閾值時,便會被發送到 receive 線程中。sent 線程發送數據,是通過一個隨機分配的TCP端口來進行的。
- 一個或多個 executor 線程。executor 內部同樣擁有一個 receive buffer 和一個 sent buffer,其中 receive buffer 接收來自 receive 線程的的數據,sent buffer 向 sent 線程發送數據;而 task 線程則介於 receive buffer 和 sent buffer 之間。receive buffer 的大小可通過 Conf.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE 參數配置,sent buffer 的大小可通過 Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE 配置,兩個參數默認都是 1024(個數,並且得是2的倍數)。
5、性能調優
合理的配置並行度
有幾個手段可以配置 topology 的並行度:
- conf.setNumWorkers() 配置 worker 的數量
- builder.setBolt("NAME", new Bolt(), 並行度) 設置 executor 數量
- spout/bolt.setNumTask() 設置 spout/bolt 的 task 數量
那么問題是:
- setNumWorkers 應該取多少?取決於哪些因素?
- kafkaSpout 的並行度應該取多少?取決於哪些因素?
- FilterBolt 的並行度應該取多少?取決於哪些因素?
- AlertBolt 的並行度應該取多少?取決於哪些因素?
- FilterBolt 用 shuffleGrouping 是最好的嗎?
- AlertBolt 用 fieldsGrouping 是最好的嗎?
回答如下:
第一個問題:關於 worker 的並行度:worker 可以分配到不同的 supervisor 節點,這也是 Storm 實現多節點並行計算的主要配置手段。據此, workers 的數量,可以說是越多越好,但也不能造成浪費,而且也要看硬件資源是否足夠。所以主要考慮集群各節點的內存情況:默認情況下,一個 worker 分配 768M 的內存,外加 64M 給 logwriter 進程;因此一個 worker 會耗費 832M 內存;題設的集群有3個節點,每個節點4G內存,除去 linux 系統、kafka、zookeeper 等的消耗,保守估計僅有2G內存可用來運行 topology,由此可知,當集群只有一個 topology 在運行的情況下,最多可以配置6個 worker。
另外,我們還可以調節 worker 的內存空間。這取決於流過 topology 的數據量的大小以及各 bolt 單元的業務代碼的執行時間。如果數據量特別大,代碼執行時間較長,那么可以考慮增加單個 worker 的工作內存。有一點需要注意的是,一個 worker 下的所有 executor 和 task 都是共享這個 worker 的內存的,也就是假如一個 worker 分配了 768M 內存,3個 executor,6個 task,那么這個 3 executor 和 6 task 其實是共用這 768M 內存的,但是好處是可以充分利用多核 CPU 的運算性能。
總結起來,worker 的數量,取值因素有:
- 節點數量,及其內存容量
- 數據量的大小和代碼執行時間
機器的CPU、帶寬、磁盤性能等也會對 Storm 性能有影響,但是這些外在因素一般不影響 worker 數量的決策。
需要注意的是,Storm 在默認情況下,每個 supervisor 節點只允許最多4個 worker(slot)進程運行;如果所配置的 worker 數量超過這個限制,則需要在 storm 配置文件中修改。
第二個問題:關於 FilterBolt 的並行度:如果 spout 讀取的是 kafka 的數據,那么正常情況下,設置為 topic 的分區數量即可。計算 kafkaSpout 的最佳取值,有一個最簡單的辦法,就是在 Storm UI里面,點開 topology 的首頁,在 Spouts (All time) 下,查看以下幾個參數的值:
- Emitted 已發射出去的tuple數
- Transferred 已轉移到下一個bolt的tuple數
- Complete latency (ms) 每個tuple在tuple tree中完全處理所花費的平均時間
- Acked 成功處理的tuple數
- Failed 處理失敗或超時的tuple數

怎么看這幾個參數呢?有幾個技巧:
- 正常情況下 Failed 值為0,如果不為0,考慮增加該 spout 的並行度。這是最重要的一個判斷依據;
- 正常情況下,Emitted、Transferred和Acked這三個值應該是相等或大致相等的,如果相差太遠,要么該 spout 負載太重,要么下游負載過重,需要調節該 spout 的並行度,或下游 bolt 的並行度;
- Complete latency (ms) 時間,如果很長,十秒以上就已經算很長的了。當然具體時間取決於代碼邏輯,bolt 的結構,機器的性能等。
kafka 只能保證同一分區下消息的順序性,當 spout 配置了多個 executor 的時候,不同分區的消息會均勻的分發到不同的 executor 上消費,那么消息的整體順序性就難以保證了,除非將 spout 並行度設為 1
第三個問題:關於 FilterBolt 的並行度:其取值也有一個簡單辦法,就是在 Storm UI里面,點開 topology 的首頁,在 Bolts (All time) 下,查看以下幾個參數的值:
- Capacity (last 10m) 取值越小越好,當接近1的時候,說明負載很嚴重,需要增加並行度,正常是在 0.0x 到 0.1 0.2 左右
- Process latency (ms) 單個 tuple 的平均處理時間,越小越好,正常也是 0.0x 級別;如果很大,可以考慮增加並行度,但主要以 Capacity 為准

一般情況下,按照該 bolt 的代碼時間復雜度,設置一個 spout 並行度的 1-3倍即可。
第四個問題:AlertBolt 的並行度同 FilterBolt。
第五個問題:shuffleGrouping 會將 tuple 均勻地隨機分發給下游 bolt,一般情況下用它就是最好的了。
總之,要找出並行度的最佳取值,主要結合 Storm UI 來做決策。
優化配置參數:
/** tuple發送失敗重試策略,一般情況下不需要調整 */ spoutConfig.retryInitialDelayMs = 0; spoutConfig.retryDelayMultiplier = 1.0; spoutConfig.retryDelayMaxMs = 60 * 1000; /** 此參數比較重要,可適當調大一點 */ /** 通常情況下 spout 的發射速度會快於下游的 bolt 的消費速度,當下游的 bolt 還有 TOPOLOGY_MAX_SPOUT_PENDING 個 tuple 沒有消費完時,spout 會停下來等待,該配置作用於 spout 的每個 task。 */ conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10000) /** 調整分配給每個 worker 的內存,關於內存的調節,上文已有描述 */ conf.put(Config.WORKER_HEAP_MEMORY_MB, 768); conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 768); /** 調整 worker 間通信相關的緩沖參數,以下是一種推薦的配置 */ conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); // 1.0 以上已移除 conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
6、rebalance
可以直接采用 rebalance 命令(也可以在 Storm UI上操作)重新配置 topology 的並行度:
storm rebalance TOPOLOGY-NAME -n 5 -e SPOUT/BOLT1-NAME=3 -e SPOUT/BOLT2-NAME=10
7、GC參數優化:
可以對每個worker的java內存參數進行調整,配置在conf/storm.yaml文件中,
可以通過在Strom的配置文件storm.yaml中設置worker的啟動參數:
worker.childopts: "-Xmx2048m"
該參數會在啟動時傳遞給JVM,然后就可以在worker中使用2048m內存了。
一個比較簡單的啟用CMS的GC配置可以為:
也可以在worker.childopts的參數中加入打印GC的日志進行GC性能的優化。
-XX:+PrintGCDetails -Xloggc:d:\gc.log
8、其他建議
1、優先使用localOrShuffleGrouping
2、注意fieldsGrouping 的數據均衡性
3、KafkaBolt批量提交
4、使用組件的並行度代替線程池
5、kryo序列化
轉自:https://www.cnblogs.com/peak-c/p/6297794.html
https://blog.csdn.net/u013063153/article/details/74177721