理解Storm並發


作者:Jack47

PS:如果喜歡我寫的文章,歡迎關注我的微信公眾賬號程序員傑克,兩邊的文章會同步,也可以添加我的RSS訂閱源

注:本文主要內容翻譯自understanding-the-parallelism-of-a-storm-topology

本篇文章介紹了Storm拓撲的並發模型。介紹了Worker進程,Executor(線程)和Task(任務)之間的關系,如何按照需要配置他們。本文基於Storm 0.8.1版本,最新發布版本已經到了0.9.5了。
對於不了解Storm的朋友,可以先去看看Storm介紹(一)

拓撲的組成部分#

在Storm集群上運行的拓撲主要包含以下的三個實體:

  • Worker進程
  • Executors
  • Tasks(任務)

下圖簡單闡釋了他們之間的關系:
Storm_worker_processes_executors_tasks

圖1:Storm中worker進程,executor(線程)和任務的關系

一個正在運行的拓撲由很多worker進程組成,這些worker進程在Storm集群的多台機器上運行。一個worker進程屬於一個特定的拓撲並且執行這個拓撲的一個或多個component(spout或者bolt)的一個或多個executor。一個worker進程就是一個Java虛擬機(JVM),它執行一個拓撲的一個子集。

一個executor是由一個worker進程產生的一個線程,它運行在worker的Java虛擬機里。一個executor為同一個component(spout或bolt)運行一個或多個任務。一個executor總會有一個線程來運行executor所有的task,這說明task在executor內部是串行執行的。

真正的數據處理邏輯是在task里執行的,在父executor線程執行過程中會運行task。在代碼中實現的每個spout或bolt是在全集群中以很多task的形式運行的。一個component的task數量在這個拓撲的生命周期中是固定不變的,但是一個component的executor(線程)數量會隨着時間推移發生變化。這說明以下條件一直成立:threads數量 <= task數量。默認情況下task數量被設置成跟executor的數量是一樣的,即Storm會在每個線程上執行一個任務(這通常是你想要的)。

同時請注意:

  • executor線程的數量在拓撲已經啟動后,可以發生變化(見下面的storm rebalance命令)。
  • 拓撲的task數量是固定的

可以看“理解Storm內部的消息緩存”來從另外一個視角來看一個worker進程生命周期中運行的各種各樣的線程和跟這些線程關聯的executor和task。

配置拓撲的並發度#

注意Storm術語中“並發度(parallelism)“特定地用來描述所謂的"parallelism hint",它代表了一個component初始的executor(線程)數量。但在這篇文章中我使用更廣泛意義的“並發度“來描述如何配置一個Storm拓撲中executor數量,worker進程數量以及task數量。當使用Storm中狹義的“並發度”時,我會特殊說明。

下表是各種配置項的概述以及如何在代碼中設置。有不止一種方法來設置這些選項,但是下表只列出了其中一些方法。Storm目前配置項的優先級是:外部component特定的配置>內部component特定的配置項>拓撲特定的配置項>storm.yaml>defaults.yaml。更多詳情可以參閱Storm文檔

用途 描述 配置項 如何通過代碼設置(例子)
worker進程數量 拓撲在集群機器上運行時需要的worker進程數據量 Config#TOPOLOGY_WORKERS Config#setNumWorkers
每個組件需要創建的executor數量 executor線程的數量 沒有單獨配置項 TopologyBuilder#setSpout()TopologyBuidler#setBolt() Storm 0.8之后使用 parallelism_hint參數來指定executor的初始數量
task數量 每個組件需要創建的task數量 Config#TOPOLOGY_TASKS ComponentConfigurationDeclarer#setNumTasks()

下面是一段如何實際設置這些配置的示例性代碼:

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout");

在上述代碼中我們配置Storm以兩個executor和4個task的初始數量去運行greenBolt。Storm會在每個executor(線程)上運行兩個任務。如果你沒有顯示配置任務的數量,Storm會默認每個executor運行一個任務。

在多租戶的Storm集群上配置並發#

在Storm 0.8.2中引入了隔離調度器,讓多個拓撲很容易且安全地共享一個集群,例如,它解決了多租戶的問題--避免多個拓撲之間的資源競爭--通過提供拓撲間的完全隔離

當使用隔離調度器的時候Nathan[Storm作者]建議你把worker數量設置成機器數量的倍數。把executor數量設置成worker數量的倍數。如果你會調用setNumTasks()(多數人不會),應該設置成executor數量的倍數。這樣做了之后,你的拓撲的負載就會均勻分布。每台機器和每個Java虛擬機進程會有相同數量的線程和大致等量的負載。Jason Jackson

運行中的Storm拓撲的例子#

下圖闡釋了在實際中一個簡單拓撲看起來長什么樣子。拓撲包含三個組件(component):一個叫BlueSpoutSpout,兩個分別叫做GreenBoltYellowBoltBolt。組件相互連接而構成一個圖結構:BlueSpout把輸出發送給GreenBolt,同樣GreenBolt把結果發送給YellowBolt

Storm_example_of_a_running_topology

運行中拓撲的例子

還記得上面展示的配置GreenBolt代碼嗎?BlueSpoutYellowBolt只設置了parallelism_hint(executor數量)。下面是相關代碼:

  Config conf = new Config();
  conf.setNumWorkers(2); // use two worker processes

  topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // parallelism hint topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2).setNumTasks(4).shuffleGrouping("blue-spout");
  topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6).shuffleGrouping("green-bolt");

  StormSubmitter.submitToplogy("mytopology", conf, topologyBuilder.createTopology());

改變正在運行拓撲的並發度#

Storm一個靈巧的功能是可以增減worker進程或者executor的數量而不需要重啟集群或者拓撲。這種做法叫做rebalancing

有兩種方法可以用來做拓撲的rebalance:

  1. 使用Storm web UI來做
  2. 使用下面介紹的命令行工具storm rebalance

命令行的例子:

# 重新配置“mytopology”拓撲使用5個worker進程[原來是2個]
# "blue-spout"這個spout使用3個[原來有2個]executor
# "yellow-bolt"使用10個[原來有6個]executor
$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

留給讀者的問題###

經過上面的rebalance命令,此時每個Bolt各自有幾個executor,幾個task,每個worker里面分配了幾個executor?

參考資料#

Storm文檔

Storm介紹(一)

Storm教程


如果您看了本篇博客,覺得對您有所收獲,請點擊右下角的“推薦”,讓更多人看到!
資助Jack47寫作,打賞一個雞蛋灌餅錢吧
pay_weixin
微信打賞
pay_alipay
支付寶打賞


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM