摘要: 在Hadoop生態圈中,針對大數據進行批量計算時,通常需要一個或者多個MapReduce作業來完成,但這種批量計算方式是滿足不了對實時性要求高的場景。那Storm是怎么做到的呢?
博主福利 給大家贈送一套hadoop視頻課程
授課老師是百度 hadoop 核心架構師
內容包括hadoop入門、hadoop生態架構以及大型hadoop商業實戰案例。
講的很細致, MapReduce 就講了 15 個小時。
學完后可以勝任 hadoop 的開發工作,很多人學的這個課程找到的工作。
(包括指導書、練習代碼、和用到的軟件都打包了)
先到先得先學習。聯系老師微信ganshiyu1026,備注OSchina。即可免費領取
部分視頻截圖展示
流式計算解決方案-Storm
在Hadoop生態圈中,針對大數據進行批量計算時,通常需要一個或者多個MapReduce作業來完成,但這種批量計算方式是滿足不了對實時性要求高的場景。
Storm是一個開源分布式實時計算系統,它可以實時可靠地處理流數據。
本章內容:
1) Storm特點
2) Storm基本概念
3) Storm分組模式
4) Storm系統架構
5) Storm容錯機制
6) 一個簡單的Storm實現
1. Storm特點
在Storm出現之前,進行實時處理是非常痛苦的事情,我們主要的時間都花在關注往哪里發消息,從哪里接收消息,消息如何序列化,真正的業務邏輯只占了源代碼的一小部分。一個應用程序的邏輯運行在很多worker上,但這些worker需要各自單獨部署,還需要部署消息隊列。最大問題是系統很脆弱,而且不是容錯的:需要自己保證消息隊列和worker進程工作正常。
Storm完整地解決了這些問題。它是為分布式場景而生的,抽象了消息傳遞,會自動地在集群機器上並發地處理流式計算,讓你專注於實時處理的業務邏輯。
Storm有如下特點:
1) 編程簡單:開發人員只需要關注應用邏輯,而且跟Hadoop類似,Storm提供的編程原語也很簡單
2) 高性能,低延遲:可以應用於廣告搜索引擎這種要求對廣告主的操作進行實時響應的場景。
3) 分布式:可以輕松應對數據量大,單機搞不定的場景
4) 可擴展:隨着業務發展,數據量和計算量越來越大,系統可水平擴展
5) 容錯:單個節點掛了不影響應用
6) 消息不丟失:保證消息處理
不過Storm不是一個完整的解決方案。使用Storm時你需要關注以下幾點:
1) 如果使用的是自己的消息隊列,需要加入消息隊列做數據的來源和產出的代碼
2) 需要考慮如何做故障處理:如何記錄消息處理的進度,應對Storm重啟,掛掉的場景
3) 需要考慮如何做消息的回退:如果某些消息處理一直失敗怎么辦?
2. Storm與Hadoop區別
1) 定義及架構
Hadoop是Apache的一個項目,是一個能夠對大量數據進行分布式處理的軟件框架。
Storm是Apache基金會的孵化項目,是應用於流式數據實時處理領域的分布式計算系統。
Hadoop |
Storm |
|
系統角色 |
JobTracker |
Nimbus |
TaskTracker |
Supervisor |
|
Child |
Worker |
|
應用名稱 |
Job |
Topology |
組件接口 |
Mapper/Reducer |
Spout/Bolt |
2) 應用方面
Hadoop是分布式批處理計算,強調批處理,常用於數據挖掘和分析。
Storm是分布式實時計算,強調實時性,常用於實時性要求較高的地方。
3) 計算處理方式
Hadoop是磁盤級計算,進行計算時,數據在磁盤上,需要讀寫磁盤;Hadoop應用MapReduce的思想,將數據切片計算來處理大量的離線數據。Hadoop處理的數據必須是已經存放在HDFS上或者類似HBase的數據庫中,所以Hadoop實現的時候是通過移動計算到這些存放數據的機器上來提高效率的。
Storm是內存級計算,數據直接通過網絡導入內存。Storm是一個流計算框架,處理的數據是實時消息隊列中的,需要寫好一個Topology邏輯,然后將接收進來的數據進行處理,所以Storm是通過移動數據平均分配到機器資源來獲得高效率的。
4) 數據處理方面
數據來源:Hadoop是HDFS上某個文件夾下的數據,數據量可能以TB來計;而Storm則是實時新增的某一筆數據。
處理過程:Hadoop是Map階段到Reduce階段的;Storm是由用戶定義處理流程,流程中可以包含多個步驟,每個步驟可以是數據源(SPOUT),也可以是處理邏輯(BOLT)。
是否結束:Hadoop最后必須要結束;而Storm沒有結束狀態,到最后一步時,就停在那,直到有新數據進入時再重新開始。
處理速度:Hadoop以處理HDFS上大量數據為目的,速度慢;Storm只要處理新增的某一筆數據即可,故此它的速度很快。
適用場景:Hadoop主要是處理一批數據,對時效性要求不高,需要處理就提交一個JOB;而Storm主要是處理某一新增數據的,故此時效性要求高。
總結,Hadoop和Storm並沒有真的優劣之分,它們只是在各自的領域上有着獨特的性能而已,若是真的把它們進行單純的比較,反而是有失公平了。事實上,只有在最合適的方面使用最合適的大數據平台,才能夠真正體現出它們的價值,也才能夠真正為我們的工作提供最為便捷的助力!
3. Storm基本概念
1) Topology
一個Storm拓撲打包了一個實時處理程序的邏輯。一個Storm拓撲跟一個MapReduce的任務(job)是類似的。主要區別是MapReduce任務最終會結束,而拓撲會一直運行(當然直到你殺死它)。一個拓撲是一個通過流分組(Stream Grouping)把Spout和Bolt連接到一起的拓撲結構。圖的每條邊代表一個Bolt訂閱了其他Spout或者Bolt的輸出流。一個拓撲就是一個復雜的多階段的流計算。
2) Tuple
元組是Storm提供的一個輕量級的數據格式,可以用來包裝你需要實際處理的數據。元組是一次消息傳遞的基本單元。一個元組是一個命名的值列表,其中的每個值都可以是任意類型的。元組是動態地進行類型轉化的—字段的類型不需要事先聲明。在Storm中編程時,就是在操作和轉換由元組組成的流。通常,元組包含整數,字節,字符串,浮點數,布爾值和字節數組等類型。要想在元組中使用自定義類型,就需要實現自己的序列化方式。
3) Stream
流是Storm中的核心抽象。一個流由無限的元組序列組成,這些元組會被分布式並行地創建和處理。通過流中元組包含的字段名稱來定義這個流。
每個流聲明時都被賦予了一個ID。只有一個流的Spout和Bolt非常常見,所以OutputFieldsDeclarer提供了不需要指定ID來聲明一個流的函數(Spout和Bolt都需要聲明輸出的流)。這種情況下,流的ID是默認的“default”。
4) Spout
Spout(噴嘴,這個名字很形象)是Storm中流的來源。通常Spout從外部數據源,如消息隊列中讀取元組數據並吐到拓撲里。Spout可以是可靠的(reliable)或者不可靠(unreliable)的。可靠的Spout能夠在一個元組被Storm處理失敗時重新進行處理,而非可靠的Spout只是吐數據到拓撲里,不關心處理成功還是失敗了。
Spout可以一次給多個流吐數據。此時需要通過OutputFieldsDeclarer的declareStream函數來聲明多個流並在調用SpoutOutputCollector提供的emit方法時指定元組吐給哪個流。
Spout中最主要的函數是nextTuple,Storm框架會不斷調用它去做元組的輪詢。如果沒有新的元組過來,就直接返回,否則把新元組吐到拓撲里。nextTuple必須是非阻塞的,因為Storm在同一個線程里執行Spout的函數。
Spout中另外兩個主要的函數是Ack和fail。當Storm檢測到一個從Spout吐出的元組在拓撲中成功處理完時調用Ack,沒有成功處理完時調用Fail。只有可靠型的Spout會調用Ack和Fail函數。
5) Bolt
在拓撲中所有的計算邏輯都是在Bolt中實現的。一個Bolt可以處理任意數量的輸入流,產生任意數量新的輸出流。Bolt可以做函數處理,過濾,流的合並,聚合,存儲到數據庫等操作。Bolt就是流水線上的一個處理單元,把數據的計算處理過程合理的拆分到多個Bolt、合理設置Bolt的task數量,能夠提高Bolt的處理能力,提升流水線的並發度。
Bolt可以給多個流吐出元組數據。此時需要使用OutputFieldsDeclarer的declareStream方法來聲明多個流並在使用[OutputColletor](https://storm.apache.org/javadoc/apidocs/backtype/storm/task/OutputCollector.html)的emit方法時指定給哪個流吐數據。
當你聲明了一個Bolt的輸入流,也就訂閱了另外一個組件的某個特定的輸出流。如果希望訂閱另一個組件的所有流,需要單獨挨個訂閱。InputDeclarer有語法糖來訂閱ID為默認值的流。例如declarer.shuffleGrouping("redBolt")訂閱了redBolt組件上的默認流,跟declarer.shuffleGrouping("redBolt", DEFAULT_STREAM_ID)是相同的。
在Bolt中最主要的函數是execute函數,它使用一個新的元組當作輸入。Bolt使用OutputCollector對象來吐出新的元組。Bolts必須為處理的每個元組調用OutputCollector的ack方法以便於Storm知道元組什么時候被各個Bolt處理完了(最終就可以確認Spout吐出的某個元組處理完了)。通常處理一個輸入的元組時,會基於這個元組吐出零個或者多個元組,然后確認(ack)輸入的元組處理完了,Storm提供了IBasicBolt接口來自動完成確認。
必須注意OutputCollector不是線程安全的,所以所有的吐數據(emit)、確認(ack)、通知失敗(fail)必須發生在同一個線程里。更多信息可以參照問題定位
6) Task
每個Spout和Bolt會以多個任務(Task)的形式在集群上運行。每個任務對應一個執行線程,流分組定義了如何從一組任務(同一個Bolt)發送元組到另外一組任務(另外一個Bolt)上。可以在調用TopologyBuilder的setSpout和setBolt函數時設置每個Spout和Bolt的並發數。
7) Component
組件(component)是對Bolt和Spout的統稱
8) Stream Grouping
定義拓撲的時候,一部分工作是指定每個Bolt應該消費哪些流。流分組定義了一個流在一個消費它的Bolt內的多個任務(task)之間如何分組。流分組跟計算機網絡中的路由功能是類似的,決定了每個元組在拓撲中的處理路線。
在Storm中有七個內置的流分組策略,你也可以通過實現CustomStreamGrouping接口來自定義一個流分組策略:
洗牌分組(Shuffle grouping): 隨機分配元組到Bolt的某個任務上,這樣保證同一個Bolt的每個任務都能夠得到相同數量的元組。
字段分組(Fields grouping): 按照指定的分組字段來進行流的分組。例如,流是用字段“user-id”來分組的,那有着相同“user-id”的元組就會分到同一個任務里,但是有不同“user-id”的元組就會分到不同的任務里。這是一種非常重要的分組方式,通過這種流分組方式,我們就可以做到讓Storm產出的消息在這個”user-id”級別是嚴格有序的,這對一些對時序敏感的應用(例如,計費系統)是非常重要的。
Partial Key grouping: 跟字段分組一樣,流也是用指定的分組字段進行分組的,但是在多個下游Bolt之間是有負載均衡的,這樣當輸入數據有傾斜時可以更好的利用資源。這篇論文很好的解釋了這是如何工作的,有哪些優勢。
All grouping: 流會復制給Bolt的所有任務。小心使用這種分組方式。
Global grouping: 整個流會分配給Bolt的一個任務。具體一點,會分配給有最小ID的任務。
不分組(None grouping): 說明不關心流是如何分組的。目前,None grouping等價於洗牌分組。
Direct grouping:一種特殊的分組。對於這樣分組的流,元組的生產者決定消費者的哪個任務會接收處理這個元組。只能在聲明做直連的流(direct streams)上聲明Direct groupings分組方式。只能通過使用emitDirect系列函數來吐元組給直連流。一個Bolt可以通過提供的TopologyContext來獲得消費者的任務ID,也可以通過OutputCollector對象的emit函數(會返回元組被發送到的任務的ID)來跟蹤消費者的任務ID。
Local or shuffle grouping:如果目標Bolt在同一個worker進程里有一個或多個任務,元組就會通過洗牌的方式分配到這些同一個進程內的任務里。否則,就跟普通的洗牌分組一樣。
9) Reliability
Storm保證了拓撲中Spout產生的每個元組都會被處理。Storm是通過跟蹤每個Spout所產生的所有元組構成的樹形結構並得知這棵樹何時被完整地處理來達到可靠性。每個拓撲對這些樹形結構都有一個關聯的“消息超時”。如果在這個超時時間里Storm檢測到Spout產生的一個元組沒有被成功處理完,那Spout的這個元組就處理失敗了,后續會重新處理一遍。
為了發揮Storm的可靠性,需要你在創建一個元組樹中的一條邊時告訴Storm,也需要在處理完每個元組之后告訴Storm。這些都是通過Bolt吐元組數據用的OutputCollector對象來完成的。標記是在emit函數里完成,完成一個元組后需要使用Ack函數來告訴Storm。
10) Workers
拓撲以一個或多個Worker進程的方式運行。每個Worker進程是一個物理的Java虛擬機,執行拓撲的一部分任務。例如,如果拓撲的並發設置成了300,分配了50個Worker,那么每個Worker執行6個任務(作為Worker內部的線程)。Storm會盡量把所有的任務均分到所有的Worker上。
4. Storm系統架構
1) 主節點(Nimbus):
在分布式系統中,調度服務非常重要,它的設計,會直接關系到系統的運行效率,錯誤恢復(fail over),故障檢測(error detection)和水平擴展(scale)的能力。
集群上任務(task)的調度由一個Master節點來負責。這台機器上運行的Nimbus進程負責任務的調度。另外一個進程是Storm UI,可以界面上查看集群和所有的拓撲的運行狀態。
2) 從節點(Supervisor)
Storm集群上有多個從節點,他們從Nimbus上下載拓撲的代碼,然后去真正執行。Slave上的Supervisor進程是用來監督和管理實際運行業務代碼的進程。在Storm 0.9之后,又多了一個進程Logviewer,可以用Storm UI來查看Slave節點上的log文件。
3) 協調服務Zookeeper:
ZooKeeper在Storm上不是用來做消息傳輸用的,而是用來提供協調服務(coordination service),同時存儲拓撲的狀態和統計數據。
l Supervisor,Nimbus和worker都在ZooKeeper留下約定好的信息。例如Supervisor啟動時,會在ZooKeeper上注冊,Nimbus就可以發現Supervisor;Supervisor在ZooKeeper上留下心跳信息,Nimbus通過這些心跳信息來對Supervisor進行健康檢測,檢測出壞節點
l 由於Storm組件(component)的狀態信息存儲在ZooKeeper上,所以Storm組件就可以無狀態,可以 kill -9來殺死
例如:Supervisors/Nimbus的重啟不影響正在運行中的拓撲,因為狀態都在ZooKeeper上,從ZooKeeper上重新加載一下就好了
l 用來做心跳
Worker通過ZooKeeper把孩子executor的情況以心跳的形式匯報給Nimbus
Supervisor進程通過ZK把自己的狀態也以心跳的形式匯報給Nimbua
l 存儲最近任務的錯誤情況(拓撲停止時會刪除)
4) 進程Worker
運行具體處理組件邏輯的進程,一個Topology可能會在一個或者多個worker里面執行,每個worker是一個物理JVM並且執行整個Topology的一部分
例如:對於並行度是300的topology來說,如果我們使用50個工作進程來執行,那么每個工作進程會處理其中的6個tasks,Storm會盡量均勻的工作分配給所有的worker
5) Task
Worker中的每一個spout/bolt的線程稱為一個task,每一個spout和bolt會被當作很多task在整個集群里執行,每一個executor對應到一個線程,在這個線程上運行多個task,Stream Grouping則是定義怎么從一堆task發射tuple到另外一堆task,可以調用TopologyBuilder類的setSpout和setBolt來設置並行度(也就是有多少個task)
5. Storm容錯機制
Storm的容錯機制包括架構容錯和數據容錯。
1) 架構容錯:
Nimbus和Supervisor進程被設計成快速失敗(fail fast)的(當遇到異常的情況,進程就會掛掉)並且是無狀態的(狀態都保存在Zookeeper或者在磁盤上)。
最重要的是,worker進程不會因為Nimbus或者Supervisor掛掉而受影響。這跟Hadoop是不一樣的,當JobTracker掛掉,所有的任務都會沒了。
當Nimbus掛掉會怎樣?
如果Nimbus是以推薦的方式處於進程監管(例如通過supervisord)之下,那它會被重啟,不會有任何影響。
否則當Nimbus掛掉后:
l 已經存在的拓撲可以繼續正常運行,但是不能提交新拓撲
l 正在運行的worker進程仍然可以繼續工作。而且當worker掛掉,supervisor會一直重啟worker。
l 失敗的任務不會被分配到其他機器(是Nimbus的職責)上了
當一個Supervisor(slave節點)掛掉會怎樣?
如果Supervisor是以推薦的方式處於進程監管(例如通過(supervisord)[supervisord.org/])之下,那它會被重啟,不會有任何影響
否則當Supervisor掛掉:分配到這台機器的所有任務(task)會超時,Nimbus會把這些任務(task)重新分配給其他機器。
當一個worker掛掉會怎么樣?
當一個worker掛掉,supervisor會重啟它。如果啟動一直失敗那么此時worker也就不能和Nimbus保持心跳了,Nimbus會重新分配worker到其他機器。
Nimbus算是一個單點故障嗎?
如果Nimbus節點掛掉,worker進程仍然可以繼續工作。而且當worker掛掉,supervisor會一直重啟worker。但是,沒有了Nimbus,當需要的時候(如果worker機器掛掉了)worker就不能被重新分配到其他機器了。
所以答案是,Nimbus在“某種程度”上屬於單點故障的。在實際中,這種情況沒什么大不了的,因為當Nimbus進程掛掉,不會有災難性的事情發生
2) 數據容錯:
Storm中的每一個Topology中都包含有一個Acker組件。 Acker組件的任務就是跟蹤從某個task中的Spout流出的每一個messageId所綁定的Tuple樹中的所有Tuple的處理情況。如果在用戶設置的最大超時時間(timetout 可以通過 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS來指定)內這些Tuple沒有被完全處理,那么Acker會告訴Spout該消息處理失敗,相反則會告知Spout該消息處理成功,它會分別調用Spout中的fail和ack方法。
6. 一個簡單的Storm實現
實現一個拓撲包括一個spout和兩個bolt。Spout發送單詞。每個bolt在輸入數據的尾部追加字符串“!!!”。三個節點排成一條線:spout發射給首個bolt,然后,這個bolt再發射給第二個bolt。如果spout發射元組“bob”和“john”,然后,第二個bolt將發射元組“bob!!!!!!”和“john!!!!!!”。
1) 其中Topology代碼如下,定義整個網絡拓撲圖:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3) .shuffleGrouping("words"); builder.setBolt("exclaim2", new ExclamationBolt(), 2) .shuffleGrouping("exclaim1"); |
2) Spout實現:
public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } |
3) Bolt實現:
public static class ExclamationBolt implements IRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } |
7. Storm常用配置
1) Config.TOPOLOGY_WORKERS:
這個設置用多少個工作進程來執行這個topology。比如,如果你把它設置成25, 那么集群里面一共會有25個java進程來執行這個topology的所有task。如果你的這個topology里面所有組件加起來一共有150的並行度,那么每個進程里面會有6個線程(150 / 25 = 6)。
2) Config.TOPOLOGY_ACKERS:
這個配置設置acker任務的並行度。默認的acker任務並行度為1,當系統中有大量的消息時,應該適當提高acker任務的並發度。設置為0,通過此方法,當Spout發送一個消息的時候,它的ack方法將立刻被調用;
3) Config.TOPOLOGY_MAX_SPOUT_PENDING:
這個設置一個spout task上面最多有多少個沒有處理的tuple(沒有ack/failed)回復, 我們推薦你設置這個配置,以防止tuple隊列爆掉。
4) Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS:
這個配置storm的tuple的超時時間 – 超過這個時間的tuple被認為處理失敗了。這個設置的默認設置是30秒