作者:Jack47
PS:如果喜歡我寫的文章,歡迎關注我的微信公眾賬號程序員傑克,兩邊的文章會同步,也可以添加我的RSS訂閱源。
內容簡介#
本文是Storm系列之一,介紹了Storm的起源,Storm作者的八卦,Storm的特點和Storm模型的基本原理,着重介紹了Storm中的基本概念(Spout, Bolt, Stream, Tuple等)和對應的編程接口,可以作為Storm的入門文檔來閱讀。
八卦#
之前的技術文章都寫的有點一板一眼,太正經了。今天在文章正式開始前,跟大家八卦一下Storm的作者Nathan Marz吧。
Storm的作者是Nathan Marz,Nathan Marz在BackType公司工作的時候有了Storm的點子並獨自一人實現了Storm。在2011年Twitter准備收購BackType之際,Nathan Marz為了提高Twitter對BackType的估值,在一篇博客里向外界介紹了Storm。Twitter對這項技術非常感興趣,因此在Twitter收購BackType的時候Storm發揮了重大作用。后來Nathan Marz開源Storm時,也借着Twitter的品牌影響力而讓Storm名聲大震!
Storm的特點之一是可靠的消息處理機制,這個機制中最重要的一環是設計一個算法來跟蹤Storm中處理的數據,確保Storm知道消息是否被完整的處理。他創造出的這個算法,極大的簡化了系統的設計。Nathan Marz說這算法是他職業生涯中開發的最出色的算法之一,也說明了受過良好的計算機科學的教育是非常重要的。有趣的是發明這個算法的那天,正好是他和不久前遇到的一個姑娘約會的日子。當天因為發明了這個算法而非常興奮,導致他心思一直在這個算法上,毫無疑問就搞砸了和這個姑娘的約會!
Storm是什么#
Storm官方網站有段簡介
Storm是一個免費並開源的分布式實時計算系統。利用Storm可以很容易做到可靠地處理無限的數據流,像Hadoop批量處理大數據一樣,Storm可以實時處理數據。Storm簡單,可以使用任何編程語言。
在Storm之前,進行實時處理是非常痛苦的事情: 需要維護一堆消息隊列和消費者,他們構成了非常復雜的圖結構。消費者進程從隊列里取消息,處理完成后,去更新數據庫,或者給其他隊列發新消息。
這樣進行實時處理是非常痛苦的。我們主要的時間都花在關注往哪里發消息,從哪里接收消息,消息如何序列化,真正的業務邏輯只占了源代碼的一小部分。一個應用程序的邏輯運行在很多worker上,但這些worker需要各自單獨部署,還需要部署消息隊列。最大問題是系統很脆弱,而且不是容錯的:需要自己保證消息隊列和worker進程工作正常。
Storm完整地解決了這些問題。它是為分布式場景而生的,抽象了消息傳遞,會自動地在集群機器上並發地處理流式計算,讓你專注於實時處理的業務邏輯。
Storm的特點##
Storm有如下特點:
0. 編程簡單:開發人員只需要關注應用邏輯,而且跟Hadoop類似,Storm提供的編程原語也很簡單
- 高性能,低延遲:可以應用於廣告搜索引擎這種要求對廣告主的操作進行實時響應的場景。
- 分布式:可以輕松應對數據量大,單機搞不定的場景
- 可擴展: 隨着業務發展,數據量和計算量越來越大,系統可水平擴展
- 容錯:單個節點掛了不影響應用
- 消息不丟失:保證消息處理
不過Storm不是一個完整的解決方案。使用Storm時你需要關注以下幾點:
- 如果使用的是自己的消息隊列,需要加入消息隊列做數據的來源和產出的代碼
- 需要考慮如何做故障處理:如何記錄消息隊列處理的進度,應對Storm重啟,掛掉的場景
- 需要考慮如何做消息的回退:如果某些消息處理一直失敗怎么辦?
Storm的應用##
跟Hadoop不一樣,Storm是沒有包括任何存儲概念的計算系統。這就讓Storm可以用在多種不同的場景下:非傳統場景下數據動態到達或者數據存儲在數據庫這樣的存儲系統里(或數據是被實時操控其他設備的控制器(如交易系統)所消費)
Storm有很多應用:實時分析,在線機器學習(online machine learning),連續計算(continuous computation),分布式遠程過程調用(RPC)、ETL等。Storm處理速度很快:每個節點每秒鍾可以處理超過百萬的數據組。它是可擴展(scalable),容錯(fault-tolerant),保證你的數據會被處理,並且很容易搭建和操作。
例如Nathan Marz提供的例子,產生Twitter的趨勢信息。Twitter從海量推文中抽取趨勢信息,並在本地區域和國家層級進行維護。這意味者一旦一個案例開始出現,Twitter的話題趨勢算法就能實時的鑒別出這個話題。這個實時的算法就是通過在Storm上連續分析Twitter數據來實現的。
其他開源的大數據解決方案##
下表列出了一組開源的大數據解決方案,包括傳統的批處理和流式處理的應用程序。
解決方案 | 開發者 | 類型 | 描述 |
---|---|---|---|
Storm | 流式處理 | Twitter的流式處理大數據分析方案 | |
S4 | Yahoo! | 流式處理 | Yahoo!的分布式流式計算平台 |
Hadoop | Apache | 批處理 | MapReduce范式的第一個開源實現 |
Spark | UC Berkeley AMPLab | 批處理 | 支持內存數據集和彈性恢復的分析平台 |
Yahoo! S4和Storm之間的關鍵差別是Storm在故障的情況下可以保證消息的處理,而S4可能會丟消息。
Hadoop無疑是大數據分析的王者,本質上是一個批量處理系統,它專注於大數據的批量處理。數據存儲在Hadoop 文件系統里(HDFS)並在處理的時候分發到集群中的各個節點。當處理完成,產出的數據放回到HDFS上。在Storm上構建的拓撲處理的是持續不斷的流式數據。不同於Hadoop的任務,這些處理過程不會終止,會持續處理到達的數據。
Hadoop處理的是靜態的數據,而Storm處理的是動態的、連續的數據。Twitter的用戶每天都會發上千萬的推,所以這種處理技術是非常有用的。Storm不僅僅是一個傳統的大數據分析系統:它是一個復雜事件(complex event-processing)處理系統的例子。復雜事件處理系統通常是面向檢測和計算的,這兩部分都可以通過用戶定義的算法在Storm中實現。例如,復雜事件處理可以用來從大量的事件中區分出有意義的事件,然后對這些事件實時處理。
Storm模型#
Storm實現了一個數據流(data flow)的模型,在這個模型中數據持續不斷地流經一個由很多轉換實體構成的網絡。一個數據流的抽象叫做流(stream),流是無限的元組(Tuple)序列。元組就像一個可以表示標准數據類型(例如int,float和byte數組)和用戶自定義類型(需要額外序列化代碼的)的數據結構。每個流由一個唯一的ID來標示的,這個ID可以用來構建拓撲中各個組件的數據源。
如下圖所示,其中的水龍頭代表了數據流的來源,一旦水龍頭打開,數據就會源源不斷地流經Bolt而被處理。圖中有三個流,用不同的顏色來表示,每個數據流中流動的是元組(Tuple),它承載了具體的數據。元組通過流經不同的轉換實體而被處理。
Storm對數據輸入的來源和輸出數據的去向沒有做任何限制。像Hadoop,是需要把數據放到自己的文件系統HDFS里的。在Storm里,可以使用任意來源的數據輸入和任意的數據輸出,只要你實現對應的代碼來獲取/寫入這些數據就可以。典型場景下,輸入/輸出數據來是基於類似Kafka或者ActiveMQ這樣的消息隊列,但是數據庫,文件系統或者web服務也都是可以的。
概念##
Storm中涉及的主要概念有:
- 拓撲(Topologies)
- 元組(Tuple)
- 流(Streams)
- Spouts(噴嘴)
- Bolts
- 任務(Tasks)
- 組件(Component)
- 流分組(Stream groupings)
- 可靠性(Reliability)
- Workers(工作進程)
可以看到Storm中各個概念的名字起的非常好,也很形象。
拓撲(Topologies)##
一個Storm拓撲打包了一個實時處理程序的邏輯。一個Storm拓撲跟一個MapReduce的任務(job)是類似的。主要區別是MapReduce任務最終會結束,而拓撲會一直運行(當然直到你殺死它)。一個拓撲是一個通過流分組(stream grouping)把Spout和Bolt連接到一起的拓撲結構。圖的每條邊代表一個Bolt訂閱了其他Spout或者Bolt的輸出流。一個拓撲就是一個復雜的多階段的流計算。
資源###
- TopologyBuilder: 使用這個類來在Java中創建拓撲
- 在生產集群中運行拓撲
- 本地模式: 通過閱讀這篇可以學習到如何在本地模式下進行拓撲的開發和測試
元組(Tuple)##
元組是Storm提供的一個輕量級的數據格式,可以用來包裝你需要實際處理的數據。元組是一次消息傳遞的基本單元。一個元組是一個命名的值列表,其中的每個值都可以是任意類型的。元組是動態地進行類型轉化的--字段的類型不需要事先聲明。在Storm中編程時,就是在操作和轉換由元組組成的流。通常,元組包含整數,字節,字符串,浮點數,布爾值和字節數組等類型。要想在元組中使用自定義類型,就需要實現自己的序列化方式。
資源###
流(Streams)##
流是Storm中的核心抽象。一個流由無限的元組序列組成,這些元組會被分布式並行地創建和處理。通過流中元組包含的字段名稱來定義這個流。
每個流聲明時都被賦予了一個ID。只有一個流的Spout和Bolt非常常見,所以OutputFieldsDeclarer
提供了不需要指定ID來聲明一個流的函數(Spout和Bolt都需要聲明輸出的流)。這種情況下,流的ID是默認的“default”。
資源###
- OutputFieldsDeclarer: 用來聲明流和流的定義
- Serialization: Storm元組的動態類型轉化,聲明自定義的序列化方式
- ISerialization: 自定義的序列化必須實現這個接口
- CONFIG.TOPOLOGY_SERIALIZATIONS: 可以通過這個配置來注冊自定義的序列化接口
Spouts##
Spout(噴嘴,這個名字很形象)是Storm中流的來源。通常Spout從外部數據源,如消息隊列中讀取元組數據並吐到拓撲里。Spout可以是可靠的(reliable)或者不可靠(unreliable)的。可靠的Spout能夠在一個元組被Storm處理失敗時重新進行處理,而非可靠的Spout只是吐數據到拓撲里,不關心處理成功還是失敗了。
Spout可以一次給多個流吐數據。此時需要通過OutputFieldsDeclarer
的declareStream
函數來聲明多個流並在調用SpoutOutputCollector
提供的emit
方法時指定元組吐給哪個流。
Spout中最主要的函數是nextTuple
,Storm框架會不斷調用它去做元組的輪詢。如果沒有新的元組過來,就直接返回,否則把新元組吐到拓撲里。nextTuple
必須是非阻塞的,因為Storm在同一個線程里執行Spout的函數。
Spout中另外兩個主要的函數是ack
和fail
。當Storm檢測到一個從Spout吐出的元組在拓撲中成功處理完時調用ack
,沒有成功處理完時調用fail
。只有可靠型的Spout會調用ack
和fail
函數。更多細節可以查看Storm Java文檔和我的另外一篇文章:Storm如何保證可靠的消息處理
Bolts##
在拓撲中所有的計算邏輯都是在Bolt中實現的。一個Bolt可以處理任意數量的輸入流,產生任意數量新的輸出流。Bolt可以做函數處理,過濾,流的合並,聚合,存儲到數據庫等操作。Bolt就是流水線上的一個處理單元,把數據的計算處理過程合理的拆分到多個Bolt、合理設置Bolt的task數量,能夠提高Bolt的處理能力,提升流水線的並發度。
Bolt可以給多個流吐出元組數據。此時需要使用OutputFieldsDeclarer
的declareStream
方法來聲明多個流並在使用[OutputColletor](https://storm.apache.org/javadoc/apidocs/backtype/storm/task/OutputCollector.html)
的emit
方法時指定給哪個流吐數據。
當你聲明了一個Bolt的輸入流,也就訂閱了另外一個組件的某個特定的輸出流。如果希望訂閱另一個組件的所有流,需要單獨挨個訂閱。InputDeclarer有語法糖來訂閱ID為默認值的流。例如declarer.shuffleGrouping("redBolt")
訂閱了redBolt組件上的默認流,跟declarer.shuffleGrouping("redBolt", DEFAULT_STREAM_ID)
是相同的。
在Bolt中最主要的函數是execute
函數,它使用一個新的元組當作輸入。Bolt使用OutputCollector
對象來吐出新的元組。Bolts必須為處理的每個元組調用OutputCollector
的ack
方法以便於Storm知道元組什么時候被各個Bolt處理完了(最終就可以確認Spout吐出的某個元組處理完了)。通常處理一個輸入的元組時,會基於這個元組吐出零個或者多個元組,然后確認(ack)輸入的元組處理完了,Storm提供了IBasicBolt接口來自動完成確認。
必須注意OutputCollector
不是線程安全的,所以所有的吐數據(emit)、確認(ack)、通知失敗(fail)必須發生在同一個線程里。更多信息可以參照問題定位。
資源###
- IRichBolt: 這是Bolt的通用接口
- IBasicBolt: 很方便的Bolt接口,用於定義做過濾或者簡單處理的Bolt
- OutputCollector: Bolt通過這個類的實例來吐元組給輸出流
- 保證消息處理:
任務(Tasks)##
每個Spout和Bolt會以多個任務(Task)的形式在集群上運行。每個任務對應一個執行線程,流分組定義了如何從一組任務(同一個Bolt)發送元組到另外一組任務(另外一個Bolt)上。可以在調用TopologyBuilder
的setSpout
和setBolt
函數時設置每個Spout和Bolt的並發數。
組件(Component)##
組件(component)是對Bolt和Spout的統稱
流分組(Stream Grouping)##
定義拓撲的時候,一部分工作是指定每個Bolt應該消費哪些流。流分組定義了一個流在一個消費它的Bolt內的多個任務(task)之間如何分組。流分組跟計算機網絡中的路由功能是類似的,決定了每個元組在拓撲中的處理路線。
在Storm中有七個內置的流分組策略,你也可以通過實現CustomStreamGrouping
接口來自定義一個流分組策略:
- 洗牌分組(Shuffle grouping): 隨機分配元組到Bolt的某個任務上,這樣保證同一個Bolt的每個任務都能夠得到相同數量的元組。
- 字段分組(Fields grouping): 按照指定的分組字段來進行流的分組。例如,流是用字段“user-id"來分組的,那有着相同“user-id"的元組就會分到同一個任務里,但是有不同“user-id"的元組就會分到不同的任務里。這是一種非常重要的分組方式,通過這種流分組方式,我們就可以做到讓Storm產出的消息在這個"user-id"級別是嚴格有序的,這對一些對時序敏感的應用(例如,計費系統)是非常重要的。
- Partial Key grouping: 跟字段分組一樣,流也是用指定的分組字段進行分組的,但是在多個下游Bolt之間是有負載均衡的,這樣當輸入數據有傾斜時可以更好的利用資源。這篇論文很好的解釋了這是如何工作的,有哪些優勢。
- All grouping: 流會復制給Bolt的所有任務。小心使用這種分組方式。在拓撲中,如果希望某類元祖發送到所有的下游消費者,就可以使用這種All grouping的流分組策略。
- Global grouping: 整個流會分配給Bolt的一個任務。具體一點,會分配給有最小ID的任務。
- 不分組(None grouping): 說明不關心流是如何分組的。目前,None grouping等價於洗牌分組。
- Direct grouping:一種特殊的分組。對於這樣分組的流,元組的生產者決定消費者的哪個任務會接收處理這個元組。只能在聲明做直連的流(direct streams)上聲明Direct groupings分組方式。只能通過使用
emitDirect
系列函數來吐元組給直連流。一個Bolt可以通過提供的TopologyContext
來獲得消費者的任務ID,也可以通過OutputCollector對象的emit
函數(會返回元組被發送到的任務的ID)來跟蹤消費者的任務ID。在ack的實現中,Spout有兩個直連輸入流,ack和ackFail,使用了這種直連分組的方式。 - Local or shuffle grouping:如果目標Bolt在同一個worker進程里有一個或多個任務,元組就會通過洗牌的方式分配到這些同一個進程內的任務里。否則,就跟普通的洗牌分組一樣。這種方式的好處是可以提高拓撲的處理效率,因為worker內部通信就是進程內部通信了,相比拓撲間的進程間通信要高效的多。worker進程間通信是通過使用Netty來進行網絡通信的。
資源###
- TopologyBuilder: 使用這個類來定義拓撲
- InputDeclarer: 當調用
TopologyBuilder
的setBolt
函數時會返回這個對象,它用來聲明一個Bolt的輸入流並指定流的分組方式。 - CoordinatedBolt: 這個Bolt對於分布式的RPC拓撲很有用,大量使用了直連流(direct streams)和直連分組(direct groupings)
可靠性(Reliability)##
Storm保證了拓撲中Spout產生的每個元組都會被處理。Storm是通過跟蹤每個Spout所產生的所有元組構成的樹形結構並得知這棵樹何時被完整地處理來達到可靠性。每個拓撲對這些樹形結構都有一個關聯的“消息超時”。如果在這個超時時間里Storm檢測到Spout產生的一個元組沒有被成功處理完,那Sput的這個元組就處理失敗了,后續會重新處理一遍。
為了發揮Storm的可靠性,需要你在創建一個元組樹中的一條邊時告訴Storm,也需要在處理完每個元組之后告訴Storm。這些都是通過Bolt吐元組數據用的OutputCollector
對象來完成的。標記是在emit
函數里完成,完成一個元組后需要使用ack
函數來告訴Storm。
這些都在“保證消息處理”一文中會有更詳細的介紹。
Workers(工作進程)##
拓撲以一個或多個Worker進程的方式運行。每個Worker進程是一個物理的Java虛擬機,執行拓撲的一部分任務。例如,如果拓撲的並發設置成了300,分配了50個Worker,那么每個Worker執行6個任務(作為Worker內部的線程)。Storm會盡量把所有的任務均分到所有的Worker上。
資源###
- Config.TOPOLOGY_WORKERS: 這個配置設置了執行拓撲時分配Worker的數量。
Storm中用到的技術##
ZeroMQ 提供了可擴展環境下的傳輸層高效消息通信,一開始Storm的內部通信使用的是ZeroMQ,后來作者想把Storm移交給Apache開源基金會來管理,而ZeroMQ的許可證書跟Apache基金會的政策有沖突。在Storm中,Netty比ZeroMQ更加高效,而且提供了worker間通信時的驗證機制,所以在Storm0.9中,就改用了Netty。
Clojure Storm系統的實現語言。Clojure是由Rich Hicky作為一種通用語言發明的,它衍生自Lisp語言,簡化了多線程編程。
Apache ZooKeeper Zookeeper是一個實現高可靠的分布式協作的開源項目。Storm使用Zookeeper來協調集群中的多個節點。
參考資料#
Storm簡介 這是淘寶主搜索的技術博客,文章通俗易懂
History of Apache Storm and lessons learned 推薦大家讀一讀,是Storm作者Nathan Marz寫的,文章講述了Storm的構思、創建過程和Storm的市場營銷,溝通交流和社區開發的故事。
twitter storm --IBM developerworks
Process real-time big data with Twitter Storm
Spark, an alternative for fast data analytics
如果您看了本篇博客,覺得對您有所收獲,請點擊右下角的“推薦”,讓更多人看到!

