Apache Storm內部原理分析


轉自:http://shiyanjun.cn/archives/1472.html

 

本文算是個人對Storm應用和學習的一個總結,由於不太懂Clojure語言,所以無法更多地從源碼分析,但是參考了官網、好多朋友的文章,以及《Storm Applied: Strategies for real-time event processing》這本書,以及結合自己使用Storm的經歷,希望對於想深入一點了解Storm原理的朋友能有所幫助,有不足之處歡迎拍磚交流。

Storm集群架構

Storm集群采用主從架構方式,主節點是Nimbus,從節點是Supervisor,有關調度相關的信息存儲到ZooKeeper集群中,架構如下圖所示:
storm-cluster
具體描述,如下所示:

  • Nimbus

Storm集群的Master節點,負責分發用戶代碼,指派給具體的Supervisor節點上的Worker節點,去運行Topology對應的組件(Spout/Bolt)的Task。

  • Supervisor

Storm集群的從節點,負責管理運行在Supervisor節點上的每一個Worker進程的啟動和終止。通過Storm的配置文件中的supervisor.slots.ports配置項,可以指定在一個Supervisor上最大允許多少個Slot,每個Slot通過端口號來唯一標識,一個端口號對應一個Worker進程(如果該Worker進程被啟動)。

  • ZooKeeper

用來協調Nimbus和Supervisor,如果Supervisor因故障出現問題而無法運行Topology,Nimbus會第一時間感知到,並重新分配Topology到其它可用的Supervisor上運行。

Stream Groupings

Storm中最重要的抽象,應該就是Stream grouping了,它能夠控制Spot/Bolt對應的Task以什么樣的方式來分發Tuple,將Tuple發射到目的Spot/Bolt對應的Task,如下圖所示:
storm-topology-tasks
目前,Storm Streaming Grouping支持如下幾種類型:

  • Shuffle Grouping:隨機分組,跨多個Bolt的Task,能夠隨機使得每個Bolt的Task接收到大致相同數目的Tuple,但是Tuple不重復
  • Fields Grouping:根據指定的Field進行分組 ,同一個Field的值一定會被發射到同一個Task上
  • Partial Key Grouping:與Fields grouping 類似,根據指定的Field的一部分進行分組分發,能夠很好地實現Load balance,將Tuple發送給下游的Bolt對應的Task,特別是在存在數據傾斜的場景,使用 Partial Key grouping能夠更好地提高資源利用率
  • All Grouping:所有Bolt的Task都接收同一個Tuple(這里有復制的含義)
  • Global Grouping:所有的流都指向一個Bolt的同一個Task(也就是Task ID最小的)
  • None Grouping:不需要關心Stream如何分組,等價於Shuffle grouping
  • Direct Grouping:由Tupe的生產者來決定發送給下游的哪一個Bolt的Task ,這個要在實際開發編寫Bolt代碼的邏輯中進行精確控制
  • Local or Shuffle Grouping:如果目標Bolt有1個或多個Task都在同一個Worker進程對應的JVM實例中,則Tuple只發送給這些Task

另外,Storm還提供了用戶自定義Streaming Grouping接口,如果上述Streaming Grouping都無法滿足實際業務需求,也可以自己實現,只需要實現backtype.storm.grouping.CustomStreamGrouping接口,該接口定義了如下方法:

1 List<Integer> chooseTasks(int taskId, List<Object> values)

上面幾種Streaming Group的內置實現中,最常用的應該是Shuffle Grouping、Fields Grouping、Direct Grouping這三種,使用其它的也能滿足特定的應用需求。

Acker原理

首先,我們理解一下Tuple Tree的概念,要計算英文句子中每個字母出現的次數,形成的Tuple Tree如下圖所示:
storm-tuple-tree
對上述這個例子,也就是說,運行時每一個英文句子都會對應一個Tuple Tree,一個Tuple Tree可能很大,也可能很小,與具體的業務需求有關。
另外,Acker也是一個Bolt組件,只不過我們實現處理自己業務邏輯時,不需要關心Acker Bolt的實現,在提交實現的Topology到Storm集群后,會在初始化Topology時系統自動為我們的Topology增加Acker這個Bolt組件,它的主要功能是負責跟蹤我們自己實現的Topology中各個Spout/Bolt所處理的Tuple之間的關系(或者可以說是,跟蹤Tuple Tree的處理進度)。
下面,我們描述一下Acker的機制,如下所示:

  1. Spout的一個Task創建一個Tuple時,即在Spout的nextTuple()方法中實現從特定數據源讀取數據的處理邏輯中,會與Acker進行通信,向Acker發送消息,Acker保存該Tuple對應信息:{:spout-task task-id :val ack-val)}
  2. Bolt在emit一個新的子Tuple時,會保存子Tuple與父Tuple的關系
  3. 在Bolt中進行ack時,會計算出父Tuple與由該父Tuple新生成的所有子Tuple的一個異或值,將該值發送給Acker(計算異或值:tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 … ^ child-tuple-idN))。可見,這里Bolt並沒有把所有生成的子Tuple發送給Acker,這要比發送一個異或值大得多了,只發送一個異或值大大降低了Bolt與Acker之間網絡通信的開銷
  4. Acker收到Bolt發送的異或值,與當前保存的task-id對應的初始ack-val做異或,tuple-id與ack-val相同,異或結果為0,但是子Tuple的child-tuple-id等並不互相相同,只有等所有的子Tuple的child-tuple-id都執行ack回來,最后ack-val就為0,表示整個Tuple樹處理成功。無論成功與失敗,最后都要從Acker維護的隊列中移除。
  5. 最后,Acker會向產生該原始父Tuple的Spout對應的Task發送通知,成功或者失敗,回調Spout的ack或fail方法。如果我們在實現Spout時,重寫了ack和fail方法,處理回調就會執行這里的邏輯。

Storm設計:組件抽象

我們編寫的處理業務邏輯的Topology提交到Storm集群后,就會發生任務的調度和資源的分配,從而也會基於Storm的設計,出現各種各樣的組件。我們先看一下,Topology提交到Storm集群后的運行時部署分布圖,如下圖所示:
storm-topology-assignment
通過上圖我們可以看出,一個Topology的Spout/Bolt對應的多個Task可能分布在多個Supervisor的多個Worker內部。而每個Worker內部又存在多個Executor,根據實際對Topology的配置在運行時進行計算並分配。
從運行Topology的Supervisor節點,到最終的Task運行時對象,我們大概需要了解Storm抽象出來的一些概念,由於相對容易,我簡單說明一下:

  • Topology:Storm對一個分布式計算應用程序的抽象,目的是通過一個實現Topology能夠完整地完成一件事情(從業務角度來看)。一個Topology是由一組靜態程序組件(Spout/Bolt)、組件關系Streaming Groups這兩部分組成。
  • Spout:描述了數據是如何從外部系統(或者組件內部直接產生)進入到Storm集群,並由該Spout所屬的Topology來處理,通常是從一個數據源讀取數據,也可以做一些簡單的處理(為了不影響數據連續地、實時地、快速地進入到系統,通常不建議把復雜處理邏輯放在這里去做)。
  • Bolt:描述了與業務相關的處理邏輯。

上面都是一些表達靜態事物(組件)的概念,我們編寫完成一個Topology之后,上面的組件都以靜態的方式存在。下面,我們看一下提交Topology運行以后,會產生那些動態的組件(概念):

  • Task:Spout/Bolt在運行時所表現出來的實體,都稱為Task,一個Spout/Bolt在運行時可能對應一個或多個Spout Task/Bolt Task,與實際在編寫Topology時進行配置有關。
  • Worker:運行時Task所在的一級容器,Executor運行於Worker中,一個Worker對應於Supervisor上創建的一個JVM實例
  • Executor:運行時Task所在的直接容器,在Executor中執行Task的處理邏輯;一個或多個Executor實例可以運行在同一個Worker進程中,一個或多個Task可以運行於同一個Executor中;在Worker進程並行的基礎上,Executor可以並行,進而Task也能夠基於Executor實現並行計算

Topology並行度計算

有關Topology的並行度的計算,官網上有一篇文章介紹(后面參考鏈接中已附上),我們這里詳細解釋一下,對於理解Storm UI上面的一些統計數據也會有很大幫助。在編寫代碼設置並行度的時候,並行度只是一個提示信息,Storm會根據這個提示信息並結合其他一些參數配置(Task個數、Worker個數),去計算運行時的並行度,這個並行度實際上描述的是,組成一個Topology的多個Spout/Bolt的運行時表現實體Task的分布,所以我們可能會想關注從一個Topology的角度去看,這些設置了並行度的Spout/Bolt對應的運行時Task,在集群的多個Worker進程之間,以及Executor內部是如何分布的。
下面是例子給出的Topology的設計,如下圖所示:
storm-example-topology
對該例子Topology配置了2個Worker,對應的代碼示例如下所示:

01 conf.setNumWorkers(2); // 該Topology運行在Supervisor節點的2個Worker進程中
02  
03 topologyBuilder.setSpout("blue-spout"new BlueSpout(), 2); // 設置並行度為2,則Task個數為2*1
04  
05 topologyBuilder.setBolt("green-bolt"new GreenBolt(), 2)
06                .setNumTasks(4)
07                .shuffleGrouping("blue-spout"); // 設置並行度為2,設置Task個數為4 ,則Task個數為4
08  
09 topologyBuilder.setBolt("yellow-bolt"new YellowBolt(), 6)
10                .shuffleGrouping("green-bolt"); // 設置並行度為6,則Task個數為6*1

那么,下面我們看Storm是如何計算一個Topology運行時的並行度,並分配到2個Worker中的:

  • 計算Task總數:2*1+4+6*1=12(總計創建12個Task實例)
  • 計算運行時Topology並行度:10/2=5(每個Worker對應5個Executor)
  • 將12個Task分配到2個Worker中的5*2個Executor中:應該是每個Worker上5個Executor,將6個Task分配到5個Executor中
  • 每個Worker中分配6個Task,應該是分配3個Yellow Task、2個Green Task、1個Blue Task
  • Storm內部優化:會把同類型的Task盡量放到同一個Executor中運行
  • 分配過程:從Task個數最少的開始,1個Blue Task只能放到一個Executor,總計1個Executor被占用;2個Green Task可以放到同一個Executor中,總計2個Executor被占用;最后看剩下的3個Yellow Task能否分配到5-2=3個Executor中,顯然每個Yellow Task對應一個Executor

從直觀上看,其實分配Task到多個Executor中的分配結果有很多種,都能滿足盡量讓同類型Task在相同的Executor中,有關Storm的計算實現可以參考源碼。
上述例子Topology在運行時,多個Task分配到集群中運行分布的結果,如下圖所示:
storm-example-topology-task-assignment-result

Storm內部原理

一個Topology提交到Storm集群上運行,具體的處理流程非常微妙,有點復雜。首先,我們通過要點的方式來概要地說明:

  • 每個Executor可能存在一個Incoming Queue和一個Outgoing Queue,這兩個隊列都是使用的LMAX Disruptor Queue(可以通過相關資料來了解)
  • 兩個LMAX Disruptor Queue的上游和下游,都會有相關線程去存儲/取出Tuple
  • 每個Executor可能存在一個Send Thread,用來將處理完成生成的新的Tuple放到屬於該Executor的Outgoing Queue隊列
  • 每個Executor一定存在一個Main Thread,用來處理銜接Spout Task/Bolt Task與前后的Incoming Queue、Outgoing Queue
  • 每個Worker進程內部可能存在一個Receive Thread,用來接收由上游Worker中的Transfer Thread發送過來的Tuple,在一個Worker內部Receive Thread是被多個Executor共享的
  • 每個Worker進程內部可能存在一個Outgoing Queue,用來存放需要跨Worker傳輸的Tuple(其內部的Transfer Thread會從該隊列中讀取Tuple進行傳輸)
  • 每個Worker進程內部可能存在一個Transfer Thread,用來將需要在Worker之間傳輸的Tuple發送到下游的Worker內

上面,很多地方我使用了“可能”,實際上大部分情況下是這樣的,注意了解即可。下面,我們根據Spout Task/Bolt Task運行時分布的不同情況,分別闡述如下:

Spout Task在Executor內部運行

Spout Task和Bolt Task運行時在Executor中運行有一點不同,如果Spout Task所在的同一個Executor中沒有Bolt Task,則該Executor中只有一個Outgoing Queue用來存放將要傳輸到Bolt Task的隊列,因為Spout Task需要從一個給定的數據源連續不斷地讀入數據而形成流。在一個Executor中,Spout Task及其相關組件的執行流程,如下圖所示:
storm-spout-task-in-executor
上圖所描述的數據流處理流程,如下所示:

  1. Spout Task從外部數據源讀取消息或事件,在實現的Topology中的Spout中調用nextTuple()方法,並以Tuple對象的格式emit()讀取到的數據
  2. Main Thread處理輸入的Tuple,然后放入到屬於該Executor的Outgoing Queue中
  3. 屬於該Executor的Send Thread從Outgoing Queue中讀取Tuple,並傳輸到下游的一個或多個Bolt Task去處理

Bolt Task在Executor內部運行

前面說過,Bolt Task運行時在Executor中與Spout Task有一點不同,一個Bolt Task所在的Executor中有Incoming Queue和Outgoing Queue這兩個隊列,Incoming Queue用來存放數據流處理方向上,該Bolt Task上游的組件(可能是一個或多個Spout Task/Bolt Task)發射過來的Tuple數據,Outgoing Queue用來存放將要傳輸到下游Bolt Task的隊列。如果該Bolt Task是數據流處理方向上最后一個組件,而且對應execute()方法沒有再進行emit()創建的Tupe數據,那么該Bolt Task就沒有Outgoing Queue這個隊列。在一個Executor中,一個Bolt Task用來銜接上游(Spout Task/Bolt Task)和下游(Bolt Task)的組件,在該Bolt Task所在的Executor內其相關組件的執行流程,如下圖所示:
storm-bolt-task-in-executor
上圖所描述的數據流處理流程,如下所示:

  1. Spout Task/Bolt Task將Tupe傳輸到下游該Bolt Task所在的Executor的Incoming Queue中
  2. Main Thread從該Executor的Incoming Queue中取出Tuple,並將Tupe發送給Bolt Task去處理
  3. Bolt Task執行execute()方法中的邏輯處理該Tuple數據,並生成新的Tuple,然后調用emit()方法將Tuple發送給下一個Bolt Task處理(這里,實際上是Main Thread將新生成的Tuple放入到該Executor的Outgoing Queue中)
  4. 屬於該Executor的Send Thread從Outgoing Queue中讀取Tuple,並傳輸到下游的一個或多個Bolt Task去處理

同一Worker內2個Spout Task/Bolt Task之間傳輸tuple

在同一個Worker JVM實例內部,可能創建多個Executor實例,那么我們了解一下,一個Tuple是如何在兩個Task之間傳輸的,可能存在4種情況,在同一個Executor中的情況有如下2種:

  • 1個Spout Task和1個Bolt Task在同一個Executor中
  • 2個Bolt Task在同一個Executor中

我們后面會對類似這種情況詳細說明,下面給出的是,2個不同的Executor中Task運行的情況,分別如下圖所示:

  • 1個Spout Task和1個Bolt Task在不同的2個Executor中

storm-transfer-tuples-between-spout-and-bolt-task-in-same-worker-different-executor

  • 2個Bolt Task在不同的2個Executor中

storm-transfer-tuples-between-2-bolt-tasks-in-same-worker-different-executor
通過前面了解一個Spout Task和一個Bolt Task運行的過程,對上面兩種情況便很好理解,不再累述。

不同Worker內2個Executor之間傳輸tuple

如果是在不同的Worker進程內,也就是在兩個隔離的JVM實例上,無論是否在同一個Supervisor節點上,Tuple的傳輸的邏輯是統一的。這里,以一個Spout Task和一個Bolt Task分別運行在兩個Worker進程內部為例,如下圖所示:
storm-transfer-tuples-between-spout-and-bolt-task-in-different-worker
處理流程和前面的類似,只不過如果兩個Worker進程分別在兩個Supervisor節點上,這里Transfer Thread傳輸Tuple走的是網絡,而不是本地。

Tuple在Task之間路由過程

下面,我們關心每一個Tuple是如何在各個Bolt的各個Task之間傳輸,如何將一個Tuple路由(Routing)到下游Bolt的多個Task呢?
這里,我們應該了解一下,作為在Task之間傳輸的消息的表示形式,定義TaskMessage類的代碼,如下所示:

01 package backtype.storm.messaging;
02  
03 import java.nio.ByteBuffer;
04  
05 public class TaskMessage {
06     private int _task;
07     private byte[] _message;
08  
09     public TaskMessage(int task, byte[] message) {
10         _task = task;
11         _message = message;
12     }
13  
14     public int task() {
15         return _task;
16     }
17  
18     public byte[] message() {
19         return _message;
20     }
21  
22     public ByteBuffer serialize() {
23         ByteBuffer bb = ByteBuffer.allocate(_message.length + 2);
24         bb.putShort((short) _task);
25         bb.put(_message);
26         return bb;
27     }
28  
29     public void deserialize(ByteBuffer packet) {
30         if (packet == null)
31             return;
32         _task = packet.getShort();
33         _message = new byte[packet.limit() - 2];
34         packet.get(_message);
35     }
36 }

可見,每一個Task都給定一個Topology內部唯一的編號,就能夠將任意一個Tuple正確地路由到下游需要處理該Tuple的Bolt Task。
假設,存在一個Topology,包含3個Bolt,分別為Bolt1、Bolt2、Bolt3,他們之間的關系也是按照編號的順序設置的,其中Bolt1有個2個Task,Bolt2有2個Task,Bolt3有2個Task,這里我們只關心Bolt1 Task到Bolt2 Task之間的數據流。具體的路由過程,如下圖所示:
storm-routing-tuples
上圖中,Bolt2的兩個Task分布到兩個Worker進程之內,所以,當上游的Bolt1的2個Task處理完輸入的Tuple並生成新的Tuple后,會有根據Bolt2的Task的編號,分別進行如下處理:

  • Bolt2 Task4分布在第一個Worker進程內,則Bolt1生成的新的Tupe直接由該Executor的Send Thread,放到第一個Worker內部的另一個Executor的Incoming Queue
  • Bolt2 Task5分布在第二個Worker進程內,則Bolt1生成的新的Tupe被Executor的Send Thread放到該Executor所在的第一個Worker的Outgoing Queue中,由第一個Worker的Transfer Thread發送到另一個Worker內(最終路由給Bolt2 Task5去處理)

通過上面處理流程可以看出,每個Executor應該維護Task與所在的Executor之間的關系,這樣才能正確地將Tuple傳輸到目的Bolt Task進行處理。

參考資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM