正文前先來一波福利推薦:
福利一:
百萬年薪架構師視頻,該視頻可以學到很多東西,是本人花錢買的VIP課程,學習消化了一年,為了支持一下女朋友公眾號也方便大家學習,共享給大家。
福利二:
畢業答辯以及工作上各種答辯,平時積累了不少精品PPT,現在共享給大家,大大小小加起來有幾千套,總有適合你的一款,很多是網上是下載不到。
獲取方式:
微信關注 精品3分鍾 ,id為 jingpin3mins,關注后回復 百萬年薪架構師 ,精品收藏PPT 獲取雲盤鏈接,謝謝大家支持!
------------------------正文開始---------------------------
問題引入:
線上最近的數據量越來越大,出現了數據處理延遲的現象,觀察storm ui的各項數據,發現有大量的spout失敗的情況,如下:
----------------------------------------------------------------------------------------------------------------------------------------------------------------
然后根據storm並發度的一些理論,進行一些參數的配置調整:
Storm的並行度是非常重要的,通過提高並行度可以提高storm程序的計算能力。
那strom是如何提高並行度的呢?
Strom程序的執行是由多個supervisor共同執行的。
supervisor運行的是topology中的spout/bolt task
task 是storm中進行計算的最小的運行單位,表示是spout或者bolt的運行實例。
程序執行的最大粒度的運行單位是進程,剛才說的task也是需要有進程來運行它的,在supervisor中,運行task的進程稱為worker,
Supervisor節點上可以運行非常多的worker進程,一般在一個進程中是可以啟動多個線程的,所以我們可以在worker中運行多個線程,這些線程稱為executor,在executor中運行task。
這樣的話就可以提高strom的計算能力。
總結一下:worker>executor>task
要想提高storm的並行度可以從三個方面來改造
worker(進程)>executor(線程)>task(實例)
增加work進程,增加executor線程,增加task實例
worker的設置:
這表示是一個work進程,其實就是一個jvm虛擬機進程,在這個work進程里面有多個executor線程,每個executor線程會運行一個或多個task實例。一個task是最終完成數據處理的實體單元。(默認情況下一個executor運行一個task)
worker,executor,task解釋:
1個worker進程執行的是1個topology的子集(注:不會出現1個worker為多個topology服務)。
1個worker進程會啟動1個或多個executor線程來執行1個topology的component(spout或bolt)。因此,1個運行中的topology就是由集群中多台物理機上的多個worker進程組成的。
executor是1個被worker進程啟動的單獨線程。每個executor只會運行1個topology的1個component(spout或bolt)的task(注:task可以是1個或多個,storm默認是1個component只生成1個task,executor線程里會在每次循環里順序調用所有task實例)。
task是最終運行spout或bolt中代碼的單元(注:1個task即為spout或bolt的1個實例,executor線程在執行期間會調用該task的nextTuple或execute方法)。topology啟動后,1個component(spout或bolt)的task數目是固定不變的,但該component使用的executor線程數可以動態調整(例如:1個executor線程可以執行該component的1個或多個task實例)。這意味着,對於1個component存在這樣的條件:#threads<=#tasks(即:線程數小於等於task數目)。默認情況下task的數目等於executor線程數目,即1個executor線程只運行1個task。
剛才從理論說明了如何提高集群的並行度,在這里我們就來看一下這些東西worker(進程)>executor(線程)>task(實例) 是如何設置的:
l worker(進程):這個worker進程數量是在集群啟動之前配置好的,在哪配置的呢?是在storm/conf/storm.yaml文件中,參數是supervisor.slots.port,如果我們不在這進行配置的話,這個參數也是有默認值的,在strom-0.9.3的壓縮包中的lib目錄下,有一個strom-core.jar,打開這個jar文件,在里面有一個defaults.yaml文件中是有一些默認配置的。
默認情況下一個storm項目只使用一個work進程,也可以通過代碼進行修改,通過config.setNumWorkers(workers)設置。
注意:如果worker使用完的話再提交topology就不會執行,因為沒有可用的worker,只能處於等待狀態,把之前運行的topology停止一個之后這個就會繼續執行了,
這里項目中存在3個脫坡,兩個worker設置為20;另外一個數據量大的設置worker數為40;相當於等於線上機器的CPU核數;(注意:我的storm ui上的slots總數為160,但是我沒有把worker數設置的更大,我的考慮是如果設置大於CPU核數,有可能反而會影響其性能,所以最終設置每個拓撲中的worker數最大不超過40,此處不一定設置大於40要不好,有了解的可以留言討論一下);
下面以worker數為20的這個拓撲來進行分析:
將超時時間由原來的30擴大到600;最大的spout緩存設置為1000*spout數=20000;ack的數設置為20(ack的個數要保持與worker一樣,因為每個worker會創建一個executor來處理ack,)
executor(線程):
executor(線程):默認情況下一個executor運行一個task,可以通過在代碼中設置builder.setSpout(id,spout, parallelism_hint);或者builder.setBolt(id,bolt,parallelism_hint);來提高線程數的。
task(實例):通過boltDeclarer.setNumTasks(num);來設置實例的個數
默認情況下,一個supervisor節點會啟動4個worker進程。每個worker進程會啟動1個executor,每個executor啟動1個task。
Ok,這幾個參數都可以使用一些方法進行增加。
這里設置spout的executor個數為20個,task個數為20個,然后bolt的executor個數設置為120,task設置為120,因為bolt進行數據處理,需要連接redis存儲,設置多個線程執行,充分發揮多核CPU性能;
下面來看一下對這些配置修改之后的效果
從ui的顯示來看,發現不在有failed出現,沒有failed的原因是
這三個參數起了效果,complete latency 的時間是45s,小於我們設置的600,在設置時間可以得到處理,不會有超時failed問題;
但是 發現Complete latency的時間比優化之前降低了,原因應該是我把executor和tasks的數值增大了,由原來的16增大到20,處理的吞度量增大,吞吐量和這個參數成反比;所以增大吞吐量可以增大executor和tasks的值;
下面看另一個問題:
在代碼中設置使用20個worker,查看ui界面,發現workers是20個,executors設置了130個,為什么顯示executor為150呢?
因為每一個worker默認都會占用一個executor(這個executor會啟動一個acker任務),這樣就會占用20個,一共 10 + 120 + 20 = 150。
Acker任務默認是每個worker進程啟動一個executor線程來執行,,可以在topology中取消acker任務,這樣的話就不會多出來一個executor和任務了。
同樣task也是這個道理;
注意:除去worker占用外,只有設置足夠多的線程和實例才可以真正的提高並行度。
在這設置多個實例主要是為了下面執行rebalance的時候用到,因為rebalance不需要修改代碼,就可以動態修改topology的並行度,這樣的話就必須提前配置好多個實例,在rebalance的時候主要是對之前設置多余的任務實例分配線程去執行。
在命令行動態修改並行度
除了使用代碼進行調整,還可以在shell命令行下對並行度進行調整。
storm rebalance mytopology -w 10 -n 2 -e spout=2 -e bolt=2
表示 10秒之后對mytopology進行並行度調整。把spout調整為2個executor,把bolt調整為2個executor
注意:並行度主要就是調整executor的數量,但是調整之后的executor的數量必須小於等於task的數量,如果分配的executor的線程數比task數量多的話也只能分配和task數量相等的executor。
經過多次試驗總結,得出如下結論:
1)Topology的worker數通過config設置,即執行該topology的worker(java)進程數。它可以通過storm rebalance 命令任意調整。
2) Topology中某個bolt的executor數,即parallelismNum,即執行該bolt的線程數,在setBolt時由第三個參數指定。它可以通過storm rebalance 命令調整,但最大不能超過該bolt的task數;
3) bolt的task數,通過setNumTasks()設置。(也可不設置,默認取bolt的executor數),無法在運行時調整。
4)Bolt實例數,這個比較特別,它和task數相等。有多少個task就會new 多少個Bolt對象。而這些Bolt對象在運行時由Bolt的thread進行調度。也即是說