【轉】Storm並行度詳解


1、Storm並行度相關的概念

Storm集群有很多節點,按照類型分為nimbus(主節點)、supervisor(從節點),在conf/storm.yaml中配置了一個supervisor,有多個槽(supervisor.slots.ports),每個槽就是一個JVM,就是一個worker(一個節點,運行一個worker),在每個worker里面可以運行多個線程叫做executor,在executor里運行一個topology的一個component(spout、bolt)叫做task。task  是storm中進行計算的最小的運行單位,表示是spout或者bolt的運行實例。

總結一下,supervisor(節點)>worker(進程)>executor(線程)>task(實例)

程序執行的最大粒度的運行單位是進程,剛才說的task也是需要有進程來運行它的,在supervisor中,運行task的進程稱為worker
Supervisor 節點上可以 運行非常多的worker進程,一般在 一個進程中是可以啟動多個線程的,所以我們可以 在worker中運行多個線程,這些線程 稱為executor在executor中運行task。
 
 
提高storm的並行度,可 考慮如下幾點:
worker(進程)>executor(線程)>task(實例)
增加work進程,增加executor線程,增加task實例
 
看下面的圖:
這表示是一個work進程,其實就是一個 jvm虛擬機進程,在這個work進程里面有多個executor線程,每個executor線程會運行一個或多個task實例。一個task是最終完成數據處理的實體單元。(默認情況下一個executor運行一個task).
 
 
worker,executor,task解釋
 
1個worker進程執行的是1個topology的子集(注: 不會出現1個worker為多個topology服務)。1個worker進程會啟動1個或多個executor線程來執行1個topology的component(spout或bolt)。因此, 1個運行中的topology就是由集群中多台物理機上的多個worker進程組成的
 
executor是1個被worker進程啟動的單獨線程。每個executor只會運行1個topology的1個component(spout或bolt)的task(注:task可以是1個或多個,storm默認是1個component只生成1個task,executor線程里會在每次循環里順序調用所有task實例)。
 
task是最終運行spout或bolt中代碼的單元(注:1個task即為spout或bolt的1個實例,executor線程在執行期間會調用該task的nextTuple或execute方法)。topology啟動后,1個component(spout或bolt)的task數目是固定不變的,但該component使用的executor線程數可以動態調整(例如:1個executor線程可以執行該component的1個或多個task實例)。這意味着,對於1個component存在這樣的條件:#threads<=#tasks(即:線程數小於等於task數目)。默認情況下task的數目等於executor線程數目,即1個executor線程只運行1個task。
 
 
 
 
 
 
 
剛才從理論說明了如何提高集群的並行度,在這里我們就來看一下這些東西worker(進程)>executor(線程)>task(實例) 是如何設置的
l  worker(進程):這個worker進程數量是在集群啟動之前配置好的,在哪配置的呢?是在storm/conf/storm.yaml文件中,參數是supervisor.slots.port,如果我們不在這進行配置的話,這個參數也是有默認值的,在strom-0.9.3的壓縮包中的lib目錄下,有一個strom-core.jar,打開這個jar文件,在里面有一個defaults.yaml文件中是有一些默認配置的。
技術分享
默認情況下一個storm項目只使用一個work進程,也可以通過代碼進行修改,通過config.setNumWorkers(workers)設置。(最好一台機器上的一個topology只使用一個worker,主要原因時減少了worker之間的 數據傳輸)
 
注意:如果worker使用完的話再提交topology就不會執行,因為沒有可用的worker,只能處於等待狀態,把之前運行的topology停止一個之后這個就會繼續執行了,
技術分享
 
l executor(線程):默認情況下一個executor運行一個task,可以通過在代碼中設置builder.setSpout(id,spout, parallelism_hint);或者builder.setBolt(id,bolt,parallelism_hint);來提高線程數的。
 
l task(實例):通過boltDeclarer.setNumTasks(num);來設置實例的個數
 
默認情況下,一個supervisor節點會啟動4個worker進程。每個worker進程會啟動1個executor,每個executor啟動1個task。
 
Ok,這幾個參數都可以使用一些方法進行增加。
 
 
下面來舉個例子看一下對這些配置修改之后的效果
 
l  worker(進程),通過在代碼中設置,可以在ui界面上查看worker的總數,並且還可以在 linux服務器上執行jps查看work進程。
技術分享
在代碼中設置使用3個worker,查看ui界面,發現workers是3個,executors使用了5個,為什么呢?因為每一個worker默認都會占用一個executor(這個executor會啟動一個acker任務),這樣就會占用三個,剩下的兩個是spout和bolt實例占用了。
 
如果使用5個worker,executor會使用7個,因為worker本身就會占用5個,spout和bolt占用兩個。
技術分享
 
 
Acker任務默認是每個worker進程啟動一個executor線程來執行,,可以在topology中取消acker任務,這樣的話就不會多出來一個executor和任務了。
代碼如下:
實際上就是修改一個配置
topology.acker.executors
技術分享
這樣的話在頁面查看就只有兩個executor和2個task了。
 
 
l  executor(線程),在spout和bolt中設置線程數,都設置為2個,查看ui界面
技術分享
現在使用的executor和tasks就是7個了,因為worker本身使用3個,spout和bolt分別使用2個。
技術分享
 
 
l  task(實例),在sum中設置實例個數為5,查看ui界面
技術分享
 
發現ui界面上顯示的tasks是10,因為spout占用2個,bolt占用5個,剩下的3個由acker任務占用
技術分享
 
注意:雖然在這設置了多個task實例,但是並行度並沒有很大提高,因為只有兩個線程去運行這些實例,只有設置足夠多的線程和實例才可以真正的提高並行度。
在這設置多個實例主要是為了下面執行rebalance的時候用到,因為rebalance不需要修改代碼,就可以動態修改topology的並行度,這樣的話就必須提前配置好多個實例,在rebalance的時候主要是對之前設置多余的任務實例分配線程去執行。
 
在命令行動態修改並行度
除了使用代碼進行調整,還可以在shell命令行下對並行度進行調整。
storm rebalance mytopology -w 10 -n 2 -e spout=2 -e bolt=2
表示 10秒之后對mytopology進行並行度調整。把spout調整為2個executor,把bolt調整為2個executor
注意:並行度主要就是調整executor的數量,但是調整之后的executor的數量必須小於等於task的數量,如果分配的executor的線程數比task數量多的話也只能分配和task數量相等的executor。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM