Storm中-Worker Executor Task的關系


Storm在集群上運行一個Topology時,主要通過以下3個實體來完成Topology的執行工作:
1. Worker(進程)
2. Executor(線程)
3. Task

下圖簡要描述了這3者之間的關系:

注:supervisor.slots.ports:對於每個工作節點配置該節點可以運行多少個worker進程。

每個worker進程使用一個但單獨的端口來收取消息,這里配置了哪個端口用來使用。

定義5個端口,那么該節點上允許最多運行5個worker進程。

默認情況下,可以在端口6700, 6701, 6702, 6703四個端口最多運行四個worker進程。

如果我們不在這進行配置的話,這個參數也是有默認值的,有一個strom-core.jar,打開這個jar文件,在里面有一個defaults.yaml文件中是有一些默認配置的。

如下圖:

 了解Storm中的拓撲的並發度

Worker :

一個Worker 進程執行的是一個topology的子集,這里我們必須強調:不會存在一個worker 為多個topology服務,

一個worker進程會啟動一個或則多個executor 線程來執行一個topology的compotent-》也就是Spout或者bolt,

一個topology就是由於集群中間的多台物理機上的Worker構成的

Workers (JVMs): 在一個節點上可以運行一個或多個獨立的JVM 進程(配置多個端口時)。一個Topology可以包含一個或多個worker(並行的跑在不同的machine上), 所以worker process就是執行一個topology的子集, 並且worker只能對應於一個topology;worker processes的數目, 可以通過配置文件和代碼中配置, worker就是執行進程, 所以考慮並發的效果, 數目至少應該大亍machines的數目。

 默認情況下一個storm項目只使用一個work進程,也可以通過代碼進行修改,通過config.setNumWorkers(workers)設置。(最好一台機器上的一個topology只使用一個worker,主要原因時減少了worker之間的數據傳輸)

注意:如果worker使用完的話再提交topology就不會執行,因為沒有可用的worker,只能處於等待狀態,把之前運行的topology停止一個之后這個就會繼續執行了,

Executor:

一個executor是一個被Worker進程啟動的單獨線程,每一個Executor都只會運行一個topology的一個component,

默認情況:一個spout,或則一個bolt都只會生成一個task,Executor線程里會在每次 循環的時候 順序的去調用所有的task的實例子

默認情況:一個executor對應一個task,可以通過配置文件,或者API來設置!

默認情況:一個executor運行一個task,可以通過在代碼中設置builder.setSpout(id,spout, parallelism_hint);或者builder.setBolt(id,bolt,parallelism_hint);來提高線程數的。

Executors (threads): 在一個worker JVM進程中運行着多個Java線程。一個executor線程可以執行一個或多個tasks.

一般默認每個executor只執行一個task。

一個worker可用包含一個或多個executor, 每個component (spout或bolt)至少對應於一個executor, 所以可以說executor執行一個compenent的子集,

同時一個executor只能對應於一個component;executor的數目, component的並發線程數只能在代碼中配置(通過setBolt和setSpout的參數)。

 

task

通過boltDeclarer.setNumTasks(num);來設置實例的個數

默認情況下,一個supervisor節點會啟動4個worker進程。每個worker進程會啟動1個executor,每個executor啟動1個task。

task是最終運行spout或bolt中代碼的單元(注:1個task即為spout或bolt的1個實例,executor線程在執行期間會調用該task的nextTuple或execute方法)。

topology啟動后,1個component(spout或bolt)的task數目是固定不變的,但該component使用的executor線程數可以動態調整(例如:1個executor線程可以執行該component的1個或多個task實例)。這意味着,對於1個component存在這樣的條件:#threads<=#tasks(即:線程數小於等於task數目)。

默認情況下task的數目等於executor線程數目,即1個executor線程只運行1個task

Tasks(bolt/spout instances):Task就是具體的處理邏輯對象,每一個Spout和Bolt會被當作很多task在整個集群里面執行。

每一個task對應到一個線程,而stream grouping則是定義怎么從一堆task發射tuple到另外一堆task。

可以調用TopologyBuilder.setSpout和TopBuilder.setBolt來設置並行度 — 也就是有多少個task,tasks的數目, 可以不配置, 默認和executor1:1, 也可以通過

setNumTasks()配置。

注意:

1、並行度主要就是調整executor的數量,但是調整之后的executor的數量必須小於等於task的數量!

如果 分配的executor的線程數task數量多的話也只能分配和task數量相等的executor

2、如果設置了多個task實例,但是並行度executor並沒有很大提高!例如Spout只有兩個線程(executor)去運行這些實例,是沒有意義的,當然rebalance的時候用到!

rebalance不需要修改代碼,就可以動態修改topology的並行度executor,這樣的話就必須提前配置好多個(task)實例,在rebalance的時候主要是對之前設置多余的任務實例分配線程去執行。只有設置足夠多的線程和實例才可以真正的提高並行度。

3、 worker是進程,executor對應於線程,spout或bolt是一個個的task

同一個worker只會執行同一個topology相關的task,即:一個worder執行一個topology的一部分task,因為topology由多台物理機上的worder構成的!

在同一個executor中可以執行多個同類型的task, 即在同一個executor中,要么全部是bolt類的task,要么全部是 spout類的task

運行的時候,spout和bolt需要被包裝成一個又一個task

TASK的存在只是為了topology擴展的靈活性,與並行度無關。

總結一下:worker>executor>task 要想提高storm的並行度可以從三個方面來改造worker(進程)>executor(線程)>task(實例)增加work進程,增加executor線程,增加task實例!


 

第二:

了解Storm中的拓撲的並發度

上圖中的3段話依次如下:

  • Storm集群中的其中1台機器可能運行着屬於多個拓撲(可能為1個)的多個worker進程(可能為1個)。每個worker進程運行着特定的某個拓撲的executors。
  • 1個或多個excutor可能運行於1個單獨的worker進程,每1個executor從屬於1個被worker process生成的線程中。每1個executor運行着相同的組件(spout或bolt)的1個或多個task。
  • 1個task執行着實際的數據處理。

1個worker進程執行一個拓撲的子集。1個worker進程從屬於1個特定的拓撲,並運行着這個拓撲的1個或多個組件(spout或bolt)的1個或多個executor。一個運行中的拓撲包括集群中的許多台機器上的許多個這樣的進程。

1個executor是1個worker進程生成的1個線程。它可能運行着1個相同的組件(spout或bolt)的1個或多個task。

1 個task執行着實際的數據處理,你用代碼實現的每一個spout或bolt就相當於分布於整個集群中的許多個task。在1個拓撲的生命周期中,1個組 件的task的數量總是一樣的,但是1個組件的executor(線程)的數量可以隨着時間而改變。這意味着下面的條件總是成立:thread的數量 <= task的數量。默認情況下,task的數量與executor的數量一樣,例如,Storm會在每1個線程運行1個task。

下面附上一段程序來說明:

     TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTasks(4);    //executors數目設置為5,即線程數為5,task為4
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");  //executors數目設置為8,即線程數為8,task默認為1
    builder.setBolt("count", new WordCount(), 4).fieldsGrouping("spout", new Fields("ming"));   //executors數目設置為4,即線程數為4

    Config conf = new Config();
    conf.setDebug(false);

    conf.setNumWorkers(3);                                     //這里是設置Topology的Workers數
    StormSubmitter.submitTopology("word-count", conf, builder.createTopology());

 

參考:http://blog.chinaunix.net/uid-28379365-id-5017449.html 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM