一、storm與Hadoop對比
Hadoop:
全量數據處理使用的大多是鼎鼎大名的hadoop或者hive,作為一個批處理系統,hadoop以其吞吐量大、自動容錯等優點,在海量數據處理上得到了廣泛的使用。
Hadoop下的Map/Reduce框架對於數據的處理流程是:
1、 將要處理的數據上傳到Hadoop的文件系統HDFS中。
2、 Map階段
a) Master對Map的預處理:對於大量的數據進行切分,划分為M個16~64M的數據分片(可通過參數自定義分片大小)
b) 調用Mapper函數:Master為Worker分配Map任務,每個分片都對應一個Worker進行處理。各個Worker讀取並調用用戶定義的Mapper函數 處理數據,並將結果存入HDFS,返回存儲位置給Master。
一個Worker在Map階段完成時,在HDFS中,生成一個排好序的Key-values組成的文件。並將位置信息匯報給Master。
3、 Reduce階段
a) Master對Reduce的預處理:Master為Worker分配Reduce任務,他會將所有Mapper產生的數據進行映射,將相同key的任務分配給某個Worker。
b) 調用Reduce函數:各個Worker將分配到的數據集進行排序(使用工具類Merg),並調用用戶自定義的Reduce函數,並將結果寫入HDFS。
每個Worker的Reduce任務完成后,都會在HDFS中生成一個輸出文件。Hadoop並不將這些文件合並,因為這些文件往往會作為另一個Map/reduce程序的輸入。
以上的流程,粗略概括,就是從HDFS中獲取數據,將其按照大小分片,進行分布式處理,最終輸出結果。從流程來看,Hadoop框架進行數據處理有以下要求:
1、 數據已經存在在HDFS當中。
2、 數據間是少關聯的。各個任務執行器在執行負責的數據時,無需考慮對其他數據的影響,數據之間應盡可能是無聯系、不會影響的。
使用Hadoop,適合大批量的數據處理,這是他所擅長的。由於基於Map/Reduce這種單級的數據處理模型進行,因此,如果數據間的關聯系較大,需要進行數據的多級交互處理(某個階段的處理數據依賴於上一個階段),需要進行多次map/reduce。又由於map/reduce每次執行都需要遍歷整個數據集,對於數據的實時計算並不合適,於是有了storm。
Storm:
對比Hadoop的批處理,Storm是個實時的、分布式以及具備高容錯的計算系統。同Hadoop一樣Storm也可以處理大批量的數據,然而Storm在保證高可靠性的前提下還可以讓處理進行的更加實時;也就是說,所有的信息都會被處理。Storm同樣還具備容錯和分布計算這些特性,這就讓Storm可以擴展到不同的機器上進行大批量的數據處理。他同樣還有以下的這些特性:
- 易於擴展:對於擴展,伴隨着業務的發展,我們的數據量、計算量可能會越來越大,所以希望這個系統是可擴展的。你只需要添加機器和改變對應的topology(拓撲)設置。Storm使用Hadoop Zookeeper進行集群協調,這樣可以充分的保證大型集群的良好運行。
- 每條信息的處理都可以得到保證。
- Storm集群管理簡易。
- Storm的容錯機能:一旦topology遞交,Storm會一直運行它直到topology被廢除或者被關閉。而在執行中出現錯誤時,也會由Storm重新分配任務。這是分布式系統中通用問題。一個節點掛了不能影響我的應用。
- 低延遲。都說了是實時計算系統了,延遲是一定要低的。
- 盡管通常使用Java,Storm中的topology可以用任何語言設計。
在線實時流處理模型
對於處理大批量數據的Map/reduce程序,在任務完成之后就停止了,但Storm是用於實時計算的,所以,相應的處理程序會一直執行(等待任務,有任務則執行)直至手動停止。
對於Storm,他是實時處理模型,與hadoop的不同是,他是針對在線業務而存在的計算平台,如統計某用戶的交易量、生成為某個用戶的推薦列表等實時性高的需求。他是一個“流處理”框架。何謂流處理?storm將數據以Stream的方式,並按照Topology的順序,依次處理並最終生成結果。
二、storm
Storm集群和Hadoop集群表面上看很類似。但是Hadoop上運行的是MapReduce jobs,而在Storm上運行的是拓撲(topology),這兩者之間是非常不一樣的。一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會運行(除非你手動kill掉)。
Storm集群主要由一個主節點(Nimbus后台程序)和一群工作節點(worker node)Supervisor的節點組成,通過 Zookeeper進行協調。Nimbus類似Hadoop里面的JobTracker。Nimbus負責在集群里面分發代碼,分配計算任務給機器, 並且監控狀態。
每一個工作節點上面運行一個叫做Supervisor的節點。Supervisor會監聽分配給它那台機器的工作,根據需要啟動/關閉工作進程。每一個工作進程執行一個topology的一個子集;一個運行的topology由運行在很多機器上的很多工作進程組成。

1、 Nimbus主節點:
主節點通常運行一個后台程序 —— Nimbus,用於響應分布在集群中的節點,分配任務和監測故障。這個很類似於Hadoop中的Job Tracker。
2、Supervisor工作節點:
工作節點同樣會運行一個后台程序 —— Supervisor,用於收聽工作指派並基於要求運行工作進程。每個工作節點都是topology中一個子集的實現。而Nimbus和Supervisor之間的協調則通過Zookeeper系統或者集群。
3、Zookeeper
Zookeeper是完成Supervisor和Nimbus之間協調的服務。而應用程序實現實時的邏輯則被封裝進Storm中的“topology”。topology則是一組由Spouts(數據源)和Bolts(數據操作)通過Stream Groupings進行連接的圖。下面對出現的術語進行更深刻的解析。
4、Worker:
運行具體處理組件邏輯的進程。
5、Task:
worker中每一個spout/bolt的線程稱為一個task. 在storm0.8之后,task不再與物理線程對應,同一個spout/bolt的task可能會共享一個物理線程,該線程稱為executor。
6、Topology(拓撲):
storm中運行的一個實時應用程序,因為各個組件間的消息流動形成邏輯上的一個拓撲結構。一個topology是spouts和bolts組成的圖, 通過stream groupings將圖中的spouts和bolts連接起來,如下圖:

一個topology會一直運行直到你手動kill掉,Storm自動重新分配執行失敗的任務, 並且Storm可以保證你不會有數據丟失(如果開啟了高可靠性的話)。如果一些機器意外停機它上面的所有任務會被轉移到其他機器上。
運行一個topology很簡單。首先,把你所有的代碼以及所依賴的jar打進一個jar包。然后運行類似下面的這個命令:
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
這個命令會運行主類: backtype.strom.MyTopology, 參數是arg1, arg2。這個類的main函數定義這個topology並且把它提交給Nimbus。storm jar負責連接到Nimbus並且上傳jar包。
Topology的定義是一個Thrift結構,並且Nimbus就是一個Thrift服務, 你可以提交由任何語言創建的topology。上面的方面是用JVM-based語言提交的最簡單的方法。
7、Spout:
消息源spout是Storm里面一個topology里面的消息生產者。簡而言之,Spout從來源處讀取數據並放入topology。Spout分成可靠和不可靠兩種;當Storm接收失敗時,可靠的Spout會對tuple(元組,數據項組成的列表)進行重發;而不可靠的Spout不會考慮接收成功與否只發射一次。
消息源可以發射多條消息流stream。使用OutputFieldsDeclarer.declareStream來定義多個stream,然后使用SpoutOutputCollector來發射指定的stream。
而Spout中最主要的方法就是nextTuple(),該方法會發射一個新的tuple到topology,如果沒有新tuple發射則會簡單的返回。
要注意的是nextTuple方法不能阻塞,因為storm在同一個線程上面調用所有消息源spout的方法。
另外兩個比較重要的spout方法是ack和fail。storm在檢測到一個tuple被整個topology成功處理的時候調用ack,否則調用fail。storm只對可靠的spout調用ack和fail。
8、Bolt:
Topology中所有的處理都由Bolt完成。即所有的消息處理邏輯被封裝在bolts里面。Bolt可以完成任何事,比如:連接的過濾、聚合、訪問文件/數據庫、等等。
Bolt從Spout中接收數據並進行處理,如果遇到復雜流的處理也可能將tuple發送給另一個Bolt進行處理。即需要經過很多blots。比如算出一堆圖片里面被轉發最多的圖片就至少需要兩步:第一步算出每個圖片的轉發數量。第二步找出轉發最多的前10個圖片。(如果要把這個過程做得更具有擴展性那么可能需要更多的步驟)。
Bolts可以發射多條消息流, 使用OutputFieldsDeclarer.declareStream定義stream,使用OutputCollector.emit來選擇要發射的stream。
而Bolt中最重要的方法是execute(),以新的tuple作為參數接收。不管是Spout還是Bolt,如果將tuple發射成多個流,這些流都可以通過declareStream()來聲明。
bolts使用OutputCollector來發射tuple,bolts必須要為它處理的每一個tuple調用OutputCollector的ack方法,以通知Storm這個tuple被處理完成了,從而通知這個tuple的發射者spouts。 一般的流程是: bolts處理一個輸入tuple, 發射0個或者多個tuple, 然后調用ack通知storm自己已經處理過這個tuple了。storm提供了一個IBasicBolt會自動調用ack。
9、Tuple:
一次消息傳遞的基本單元。本來應該是一個key-value的map,但是由於各個組件間傳遞的tuple的字段名稱已經事先定義好,所以tuple中只要按序填入各個value就行了,所以就是一個value list.
10、Stream:
源源不斷傳遞的tuple就組成了stream。消息流stream是storm里的關鍵抽象。一個消息流是一個沒有邊界的tuple序列, 而這些tuple序列會以一種分布式的方式並行地創建和處理。通過對stream中tuple序列中每個字段命名來定義stream。在默認的情況下,tuple的字段類型可以是:integer,long,short, byte,string,double,float,boolean和byte array。你也可以自定義類型(只要實現相應的序列化器)。
每個消息流在定義的時候會被分配給一個id,因為單向消息流使用的相當普遍, OutputFieldsDeclarer定義了一些方法讓你可以定義一個stream而不用指定這個id。在這種情況下這個stream會分配個值為‘default’默認的id 。
Storm提供的最基本的處理stream的原語是spout和bolt。你可以實現spout和bolt提供的接口來處理你的業務邏輯。

11、Stream Groupings:
Stream Grouping定義了一個流在Bolt任務間該如何被切分。這里有Storm提供的6個Stream Grouping類型:
1). 隨機分組(Shuffle grouping):隨機分發tuple到Bolt的任務,保證每個任務獲得相等數量的tuple。
2). 字段分組(Fields grouping):根據指定字段分割數據流,並分組。例如,根據“user-id”字段,相同“user-id”的元組總是分發到同一個任務,不同“user-id”的元組可能分發到不同的任務。
3). 全部分組(All grouping):tuple被復制到bolt的所有任務。這種類型需要謹慎使用。
4). 全局分組(Global grouping):全部流都分配到bolt的同一個任務。明確地說,是分配給ID最小的那個task。
5). 無分組(None grouping):你不需要關心流是如何分組。目前,無分組等效於隨機分組。但最終,Storm將把無分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執行(如果可能)。
6). 直接分組(Direct grouping):這是一個特別的分組類型。元組生產者決定tuple由哪個元組處理者任務接收。
當然還可以實現CustomStreamGroupimg接口來定制自己需要的分組。
storm 和hadoop的對比來了解storm中的基本概念。
| Hadoop | Storm | |
| 系統角色 | JobTracker | Nimbus |
| TaskTracker | Supervisor | |
| Child | Worker | |
| 應用名稱 | Job | Topology |
| 組件接口 | Mapper/Reducer | Spout/Bolt |
3. Storm應用場景
Storm 與其他大數據解決方案的不同之處在於它的處理方式。Hadoop 在本質上是一個批處理系統。數據被引入 Hadoop 文件系統 (HDFS) 並分發到各個節點進行處理。當處理完成時,結果數據返回到 HDFS 供始發者使用。Storm 支持創建拓撲結構來轉換沒有終點的數據流。不同於 Hadoop 作業,這些轉換從不停止,它們會持續處理到達的數據。
Twitter列舉了Storm的三大類應用:
1. 信息流處理{Stream processing}
Storm可用來實時處理新數據和更新數據庫,兼具容錯性和可擴展性。即Storm可以用來處理源源不斷流進來的消息,處理之后將結果寫入到某個存儲中去。
2. 連續計算{Continuous computation}
Storm可進行連續查詢並把結果即時反饋給客戶端。比如把Twitter上的熱門話題發送到瀏覽器中。
3. 分布式遠程程序調用{Distributed RPC}
Storm可用來並行處理密集查詢。Storm的拓撲結構是一個等待調用信息的分布函數,當它收到一條調用信息后,會對查詢進行計算,並返回查詢結果。舉個例子Distributed RPC可以做並行搜索或者處理大集合的數據。
通過配置drpc服務器,將storm的topology發布為drpc服務。客戶端程序可以調用drpc服務將數據發送到storm集群中,並接收處理結果的反饋。這種方式需要drpc服務器進行轉發,其中drpc服務器底層通過thrift實現。適合的業務場景主要是實時計算。並且擴展性良好,可以增加每個節點的工作worker數量來動態擴展。
