[轉]JStorm介紹


一、簡介

Storm是開源的分布式容錯實時計算系統,目前被托管在GitHub上,遵循 Eclipse Public License 1.0。最初由BackType開發,現在已被Twitter收入麾下。Storm最新版本是Storm 0.9,核心采用Clojure實現。Storm為分布式實時計算提供了一組通用原語,可被用於“流處理”之中,實時處理消息;Storm也可被用於“連續計算”(continuous computation),對數據流做連續處理,在計算時就將結果以流的形式輸出給用戶;它還可被用於“分布式RPC”,以並行的方式執行運算。

Storm主要特點如下:
0、簡單的編程模型。類似於MapReduce降低了並行批處理復雜性,Storm降低了實時處理的復雜性。
1、語言無關。Storm的消息處理組件可以用任何語言來定義。默認支持Clojure、Java、Ruby和Python。要增加對其他語言的支持,只需實現一個簡單的Storm通信協議即可。
2、容錯性。如果在消息處理過程中出了一些異常,Storm會重新調度出問題的處理邏輯。Storm保證一個處理單元永遠運行,除非顯式殺掉。
3、可伸縮性。Storm的可伸縮性可以使其每秒處理的消息量達到很高。為了擴展一個實時計算任務,需要做的就是增加節點並且提高計算任務的並行度設置(parallelism setting)。Storm應用在10個節點的集群上每秒可以處理高達1000000個消息,包括每秒一百多次的數據庫調用[5]。同時Storm使用ZooKeeper來協調集群內的各種配置使得Storm的集群可以很容易擴展。
4、保證無數據丟失。實時系統必須保證所有的數據被成功的處理。 那些會丟失數據的系統的適用場景非常窄,而Storm保證每一條消息都會被處理。
5、適用場景廣泛。消息流處理、持續計算、分布式方法調用等是Storm適用場景廣泛的基礎,Storm的這些基礎原語可以滿足大量的場景。

雖然Storm具備諸多優勢,但也存在不足:
0、Storm目前還存在Nimbus SPOF的問題;
1、存在雪崩問題;
2、資源粒度較粗;
3、Clojure實現引入了學習成本;

為此,阿里巴巴中間件團隊用Java重新實現了類Storm的JStorm,同樣被托管在GitHub上,遵循 Eclipse Public License 1.0,目前版本0.9.3。相關資料顯示,阿里巴巴內部已經大規模部署了Storm/JStorm集群。
JStorm繼承了Storm的所有優點,同時與Storm相比JStorm所特有的如下特點:
0、兼容Storm接口。開發者在Storm上運行的程序無需任何修改即可運行在JStorm上。
1、Nimbus HA。解決了Storm的Nimbus單點問題,支持自動熱備切換Nimbus。
2、更細粒度的資源划分。JStorm從CPU、MEMORY、DISK和NET四個維度進行任務調度,同時不存在任務搶占問題。
3、可定制的任務調度機制。(Storm的任務調度目前也可定制)
4、更好的性能。通過底層ZeroMQ和Netty使JStorm具有更好的性能,同時具有更好的穩定性。
5、解決了Storm的雪崩問題。通過Netty和disruptor機制實現RPC保證可以匹配的數據發送和接收速度避免雪崩問題。

此外,JStorm通過減少對zookeeper的訪問量、增加反序列化線程、優化ACK、增加監控內容及JAVA本身優勢等各個方面優化了Storm的性能和穩定性。總之,JStorm比Storm更強大、更穩定、性能更好
(本文后面所述關於JStorm的部分內容同樣適用Storm)

二、數據模型

JStorm通過一系列基本元素實現實時計算的目標,其中包括了Topology、Stream、Spout、Bolt等等。JStorm在模型上和MapReduce有很多相似的地方,下表從不同維度對JStorm和MapReduce進行了比較。

 

MapReduce

JStorm

Role

JobTracker

Nimbus

TaskTracker

Supervisor

Child

Worker

Application

Job

Topology

Interface

Mapper/Reducer

Spout/Bolt

實時計算任務需要打包成Topology提交,和MapReduce Job相似,不同的是,MapReduce Job在計算完成后結束,而JStorm的Topology任務一旦提交永遠不會結束,除非顯式停止。

計算任務Topology是由不同的Spout和Bolt通過Stream連接起來的DAG圖。下面是一個典型Topology的結構示意圖:

 

其中:

Spout:JStorm的消息源。用於生產消息,一般是從外部數據源(如MQ/RDBMS/NoSQL/RTLog等)不間斷讀取數據並向下游發送消息。

Bolt:JStorm的消息處理者。用於為Topology進行消息處理,Bolt可以執行查詢、過濾、聚合及各種復雜運算操作,Bolt的消息處理結果可以作為下游Bolt的輸入不斷迭代。

Stream:JStorm中對數據進行的抽象,它是時間上無界的Tuple元組序列。在Topology中Spout是Stream的源頭,負責從特定數據源發射Stream;Bolt可以接收任意多個Stream輸入然后進行數據的加工處理,如果需要Bolt還可以發射出新Stream給下游Bolt。

Tuple:JStorm使用Tuple作為數據模型,存在於任意兩個有數據交互的組件(Spout/Bolt)之間。每個Tuple是一組具有各自名稱的值,值可以是任何類型,JStorm支持所有的基本類型、字符串以及字節數組,也可以使用自定義類型(需實現對應序列化器)作為值類型。簡單來說,Tuple就是一組實現了序列化器帶有名稱的Java對象集合。

從整個Topology上看,Spout/Bolt可以看作DAG的節點,Stream是連接不同節點之間的有向邊,Tuple則是流過Stream的數據集合。
下面是一個Topology內部Spout和Bolt之間的數據流關系:

 

Topology中每一個計算組件(Spout和Bolt)都有一個並行度,在創建Topology時指定(默認為1),JStorm在集群內分配對應個數的線程Task並行。

如上圖示,既然對於Spout/Bolt都會有多個線程來並行執行,那么如何在兩個組件(Spout和Bolt)之間發送Tuple會成為新的問題。

JStorm通過定義Topology時為每個Bolt指定輸入Stream以及指定提供的若干種數據流分發(Stream Grouping)策略用來解決這一問題。

JStorm提供了以下幾種Stream Grouping策略:
0) Shuffle Grouping:隨機分組,隨機派發Stream里面的Tuple,保證每個Bolt接收到的Tuple數目大致相同,通過輪詢隨機的方式使得下游Bolt之間接收到的Tuple數目差值不超過1。
1) Fields Grouping:按字段分組,具有同樣字段值的Tuple會被分到相同Bolt里的Task,不同字段值則會被分配到不同Task。
2) All Grouping:廣播分組,每一個Tuple,所有的Bolt都會收到。
3) Global Grouping:全局分組,Tuple被分配到Bolt中ID值最低的的一個Task。
4) Non Grouping:不分組,Tuple會按照完全隨機的方式分發到下游Bolt。
5) Direct Grouping:直接分組,Tuple需要指定由Bolt的哪個Task接收。 只有被聲明為Direct Stream的消息流可以聲明這種分組方法。
6) Local or Shuffle Grouping:基本同Shuffle Grouping。
7) Custom Grouping:用戶自定義分組策略,CustomStreamGrouping是自定義分組策略時用戶需要實現的接口。

三、系統架構

 

JStorm與Hadoop相似,保持了Master/Slave的簡潔優雅架構。與Hadoop不同,JStorm的M/S之間不是直接通過RPC交換心跳信息,而是借助ZK來實現,這樣的設計雖然引入了第三方依賴,但是簡化了Nimbus/Supervisor的設計,同時也極大提高了系統的容錯能力。

整個JStorm系統中共存三類不同的Daemon進程,分別是Nimbus,Supervisor和Worker。

Nimbus:JStorm中的主控節點,Nimbus類似於MR的JT,負責接收和驗證客戶端提交的Topology,分配任務,向ZK寫入任務相關的元信息,此外,Nimbus還負責通過ZK來監控節點和任務健康情況,當有Supervisor節點變化或者Worker進程出現問題時及時進行任務重新分配。Nimbus分配任務的結果不是直接下發給Supervisor,也是通過ZK維護分配數據進行過渡。特別地,JStorm 0.9.0領先Apache Storm實現了Nimbus HA,由於Nimbus是Stateless節點,所有的狀態信息都交由ZK托管,所以HA相對比較簡單,熱備Nimbus subscribe ZK關於Master活躍狀態數據,一旦發現Master出現問題即從ZK里恢復數據后可以立即接管。

Supervisor:JStorm中的工作節點,Supervisor類似於MR的TT,subscribe ZK分配到該節點的任務數據,根據Nimbus的任務分配情況啟動/停止工作進程Worker。Supervisor需要定期向ZK寫入活躍端口信息以便Nimbus及時監控。Supervisor不執行具體的數據處理工作,所有的數據處理工作都交給Worker完成。

Worker:JStorm中任務執行者,Worker類似於MR的Task,所有實際的數據處理工作最后都在Worker內執行完成。Worker需要定期向Supervsior匯報心跳,由於在同一節點,同時為保持節點的無狀態,Worker定期將狀態信息寫入本地磁盤,Supervisor通過讀本地磁盤狀態信息完成心跳交互過程。Worker綁定一個獨立端口,Worker內所有單元共享Worker的通信能力。

Nimbus、Supervisor和Worker均為Stateless節點,支持Fail-Fast,這為JStorm的擴展性和容錯能力提供了很好的保障。

還剩一個問題是Topology的各個計算組件(Spout/Bolt)如何映射到計算資源上。梳理這個問題前需要先明確Worker/Executor/Task之間的關系:

0、Worker:完整的Topology任務是由分布在多個Supervisor節點上的Worker進程(JVM)來執行,每個Worker都執行且僅執行Topology任務的一個子集。

1、Executor:Worker內部會有一個或多個Executor,每個Executor對應一個線程。Executor包括SpoutExecutor和BoltExecutor,同一個Worker里所有的*Executor只能屬於某一個Topology里的執行單元。

2、Task:執行具體數據處理實體,也就是用戶實現的Spout/Blot實例。一個Executor可以對應多個Task,定義Topology時指定,默認Executor和Task一一對應。這就是說,系統中Executor數量一定是小於等於Task數量(#Executor≤#Task)。

下圖給出了一個簡單的例子,上半部分描述的是Topology結構及相關說明,其中定義了整個Topology的worker=2,DAG關系,各個計算組件的並行度;下半部分描述了Topology的Task在Supervisor節點的分布情況。從中可以看出Topology到Executor之間的關系。

 

0、Worker數在提交Topology時在配置文件中指定;

例:#Worker=2

1、執行線程/Executor數在定義Topology的各計算組件並行度時決定,可以不指定,默認為1。其中各個計算組件的並行度之和即為該Topology執行線程總數。

例:#Executor=sum(#parallelism hint)=2+2+6=10

2、Task數目也在定義Toplogy時確定,若不指定默認每個Executor線程對應一個Task,若指定Task數目會在指定數目的線程里平均分配。

例:#Task=sum(#task)=2+4+6=12,其中Executor4={Task0,Task1}

四、 關鍵流程

 

0、Topology提交

JStorm為用戶提供了StormSubmitter. submitTopology用來向集群提交Topology,整個提交流程:

Client端:
0)客戶端簡單驗證;
1)檢查是否已經存在同名Topology;
2)提交jar包;
3)向Nimbus提交Topology;

Nimbus端:
0)Nimbus端簡單合法性檢查;
1)生成Topology Name;
2)序列化配置文件和Topology Code;
3)Nimbus本地准備運行時所需數據;
4)向ZK注冊Topology和Task;
5)將Task壓入分配隊列等待TopologyAssign分配;

1、任務調度策略

從0.9.0開始,JStorm提供非常強大的調度功能,基本上可以滿足大部分的需求,同時支持自定義任務調度策略。JStorm的資源不再僅是Worker的端口,而從CPU/Memory/Disk/Net等四個維度綜合考慮。
Nimbus任務調度算法[2]如下:
0)優先使用自定義任務分配算法,當資源無法滿足需求時,該任務放到下一級任務分配算法;
1)使用歷史任務分配算法(如果打開使用歷史任務屬性),當資源無法滿足需求時,該任務放到下一級任務分配算法;
2)使用默認資源平衡算法,計算每個Supervisor上剩余資源權值,取權值最高的Supervisor分配任務。

2、Acker機制

為保證無數據丟失,Storm/JStorm使用了非常漂亮的可靠性處理機制,如圖當定義Topology時指定Acker,JStorm除了Topology本身任務外,還會啟動一組稱為Acker的特殊任務,負責跟蹤Topolgogy DAG中的每個消息。每當發現一個DAG被成功處理完成,Acker就向創建根消息的Spout任務發送一個Ack信號。Topology中Acker任務的並行度默認parallelism hint=1,當系統中有大量的消息時,應該適當提高Acker任務的並行度。

 

Acker按照Tuple Tree的方式跟蹤消息。當Spout發送一個消息的時候,它就通知對應的Acker一個新的根消息產生了,這時Acker就會創建一個新的Tuple Tree。當Acker發現這棵樹被完全處理之后,他就會通知對應的Spout任務。

 

Acker任務保存了數據結構Map<MessageID,Map< TaskID, Value>>,
其中MessageID是Spout根消息ID,TaskID是Spout任務ID,Value表示一個64bit的長整型數字,是樹中所有消息的隨機ID的異或結果。通過TaskID,Acker知道當消息樹處理完成后通知哪個Spout任務,通過MessageID,Acker知道屬於Spout任務的哪個消息被成功處理完成。Value表示了整棵樹的的狀態,無論這棵樹多大,只需要這個固定大小的數字就可以跟蹤整棵樹。當消息被創建和被應答的時候都會有相同的MessageID發送過來做異或。當Acker發現一棵樹的Value值為0的時候,表明這棵樹已經被成功處理完成。

例如,對於前面Topology中消息樹,Acker數據的變化過程:
Step0.A發送T0給B后:
R0=r0
<id0,<taskA,R0>>
Step1.B接收到T0並成功處理后向C發送T1,向D發送T2:
R1=R0^r1^r2=r0^r1^r2
<id0,<taskA,R0^R1>>
=<id0,<taskA,r0^r0^r1^r2>>
=<id0,<taskA,r1^r2>>
Step2.C接收到T1並成功處理后:
R2=r1
<id0,<taskA,r1^r2^R2>>
=<id0,<taskA,r1^r2^r1>>
=<id0,<taskA,r2>>
Step3.D接收到T2並成功處理后:
R3=r2
<id0,<taskA,r2^R3>>
=<id0,<taskA,r2^r2>>
=<id0,<taskA,0>>
當結果為0時Acker可以通知taskA根消息id0的消息樹已被成功處理完成。

需要指出的是,Acker並不是必須的,當實際業務可以容忍數據丟失情況下可以不用Acker,對數據丟失零容忍的業務必須打開Acker,另外當系統的消息規模較大是可適當增加Acker的並行度。

3、故障恢復

0)節點故障

Nimbus故障。Nimbus本身無狀態,所以Nimbus故障不會影響正在正常運行任務,另外Nimbus HA保證Nimbus故障后可以及時被備份Nimbus接管。
Supervisors節點故障。Supervisor故障后,Nimbus會將故障節點上的任務遷移到其他可用節點上繼續運行,但是Supervisor故障需要外部監控並及時手動重啟。
Worker故障。Worker健康狀況監控由Supervisor負責,當Woker出現故障時,Supervisor會及時在本機重試重啟。
Zookeeper節點故障。Zookeeper本身具有很好的故障恢復機制,能保證至少半數以上節點在線就可正常運行,及時修復故障節點即可。

1)任務失敗

Spout失敗。消息不能被及時被Pull到系統中,造成外部大量消息不能被及時處理,而外部大量計算資源空閑。
Bolt失敗。消息不能被處理,Acker持有的所有與該Bolt相關的消息反饋值都不能回歸到0,最后因為超時最終Spout的fail將被調用。
Acker失敗。Acker持有的所有反饋信息不管成功與否都不能及時反饋到Spout,最后同樣因為超時Spout的fail將被調用。
任務失敗后,需要Nimbus及時監控到並重新分配失敗任務。

五、基礎接口

這里把幾個基礎接口中注釋摘出來說明其的作用:

0、ISpout: ISpout is the core interface for implementing spouts. A Spout is responsible for feeding messages into the topology for processing. For every tuple emitted by a spout, Storm will track the (potentially very large) DAG of tuples generated based on a tuple emitted by the spout. When Storm detects that every tuple in that DAG has been successfully processed, it will send an ack message to the Spout.
1、IBolt: IBolt represents a component that takes tuples as input and produces tuples as output. An IBolt can do everything from filtering to joining to functions to aggregations. It does not have to process a tuple immediately and may hold onto tuples to process later.
2、TopologyBuilder: TopologyBuilder exposes the Java API for specifying a topology for Storm to execute.
3、StormSubmitter: Use this class to submit topologies to run on the Storm cluster.

針對前面例子中的Topology這里給出一個簡單的實現,其中略去了BlueSpout/GreeBolt/YellowBolt的具體實現,更多參考這里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main (String[] args){
   Config conf = new Config();
   // use two worker processes
   conf.setNumWorkers(2);
   // set parallelism hint to 2
   topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2);
   topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
                  .setNumTasks(4)
                  .shuffleGrouping("blue-spout");
   topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
                  .shuffleGrouping("green-bolt");
   StormSubmitter.submitTopology(
        "mytopology",
        conf,
        topologyBuilder.createTopology());
}

JStorm更多包括事務在內的接口詳見源碼。

六、結語

本文對JStorm做了簡單介紹,有錯誤之處敬請指正。

七、參考文檔

[1]Storm社區.http://storm.incubator.apache.org/

[2]JStorm源碼.https://github.com/alibaba/jstorm/

[3]Storm源碼.https://github.com/nathanmarz/storm/

[4]Jonathan Leibiusky, Gabriel Eisbruch, etc. Getting Started with Storm.http://shop.oreilly.com/product/0636920024835.do. O’Reilly Media, Inc.

[5]Xumingming Blog.http://xumingming.sinaapp.com/

[6]量子恆道官方博客.http://blog.linezing.com/

[7]Google Image.http://images.google.com


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM