在這個教程中,你將學會如何創建 Storm 的topology並將他們部署到 Storm 集群上, 主要的語言是 Java,但是少數幾個例子用 Python 編寫來說明 Storm 的多語言支持能力。
術語和名詞
MapReduce jobs
topologies topology 由用戶編寫的Storm集群中的業務處理邏輯
deamon 守護進程
worker process 工作進程
stream 流 指Storm中的數據流
tuple 元組 指stream中的最小單元數據
primitive 基件 指storm topology 的組成部分,比如 bolt 和 spout
task 任務
Storm 集群里的各種組件
從表面上看一個 Storm 集群 與 一個 Hadoop 集群相似,然而在 Hadoop 上運行 “MapReduce jobs”, 在 Storm 上運行 “topologies”, 但是 “jobs” 和 “topologies” 是非常不同的– 一個關鍵的不同是 MapReduce job 最終會結束,而一個 topology 是永遠在等待消息並處理(直到你殺掉它)。
一個 Storm 集群中有兩種節點(node):主節點和工作節點,主節點運行一個叫 “Nimbus” 的守護進程(daemon)跟 Hadoop 的 “任務跟蹤器”(Jobtracker)類似。Nimbus 負責向集群中分發代碼, 向各機器分配任務,以及監測故障。
這里的節點是指Storm集群中不同角色的服務器節點
每一個工作節點運行一個名叫 “Supervisor” 的守護進程。Supervisor 監聽 Nimbus 指派到這個這台機器的任務,根據 Numbus 的指派信息來停啟工作進程(worker process) ,每一個 worker process 執行一個topology的子集,一個運行中的topology由跨越多個主機的多個 worker process 組成。
在 Nimbus 和 Supervisors 之間的所有協調調度通過一個 Zookeeper 集群來完成。另外,Nimbus 守護進程和 Supervisor 守護進程都是快速失敗 (fail-fast)和無狀態的;所有的狀態保存在 Zookeeper 或者本地磁盤中。這意味着你可以 kill -9
Nimbus 或者 Supervisors 他們會自動恢復,就像什么都沒發生過一樣。這種設計讓 Storm 集群變的不可思議的穩定。
Topologies
在Strom上做實時計算, 你需要創建 “Topology”,一個 topology 是一個計算過程的描述,一個 topology 中的每一個節點包含處理邏輯,節點之間的連接表明了數據在節點之間是如何傳遞的。
這里的節點是指 topology 中計算過程的每一個步驟
運行一個 是很簡單的。首先,你將你所有的代碼和依賴都打包到一個單獨的jar包中,然后運行像下面這樣的命令:
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
這樣會傳遞arg1
和 arg2
參數給backtype.storm.MyTopology
類,這個類的 main
方法定義topology 並將它提交到 Nimbus。Strom jar
部分負責連接 Nimbus 並上傳jar包.
由於 topology 的定義本來就是 Thrift 結構,並且 Nimbus 是一個 Thrift 服務, 所以你可以使用任何編程語言來創建和提交 topology。上面的方法是使用基於 JVM 的編程語言來完成的最簡單的方法,參考Running topologies on a production cluster 來獲取更多的關於開啟和停止 topology 的方法。
Streams
Strom 的核心抽象概念是 “流” (stream),一個 stream 相當於一個無限的元組(tuple) 序列,Storm 提供了以可靠且分布式的方法來將一個 stream 轉換成一個新 stream 的基件 (primitive) ,例如你可能想將一個微博的 stream 來轉成一個熱門話題的 stream。
Storm提供基本的用來做流轉換的的基件是 “spout” 和 “bolts” ,spout 和 bolt 提供了接口,你可以實現這些接口來處理的你自己的應用程序相關的邏輯。
spout 是流的來源, 例如 spout 可以從一個 Kestrel 隊列來讀 tuple 並且發射(emit)他們形成一個流,或者 spout 可以連接到 Twitter api,來發射一個推文的流。
一個 bolt 消費任意數量的流, 做一些處理,然后可能會發射出新的流,復雜的流轉換,例如從一個推文的流計算出一個熱門話題的流,需要多個步驟,多個 bolt 。bolt可以通過運行函數(functions)來做任何事,例如過濾元組,做流聚合,做流連接,跟數據庫交互等等。
所有的 spout 和 bolt 被打包到了一個 “topology” 中 ,topology 是你提交給 Storm 集群來執行的計算過程的最高抽象,一個 topology 類似一個流轉換的圖表,它現顯示了哪些 bolt 是綁定(subscribe)哪些 stream 上的 。當一個 spout 或者 bolt 發射出一個 tuple 到 stream 中,它會發送 tuple 到所有綁定了這個 stream 的 bolt 中。
topology 中節點之間的連接表明了 tuple 是如何在他們之間傳遞的。例如如果在 spout A 和 bolt B 之間有一個連接,從 spout A 到 bolt C 之間有一個連接,從 boltB 到 boltC 有一個連接,tuple 會發到 bolt B 和 bolt C 中, 所有 bolt B 的輸出 tuple 也會流到 bolt C 中
這里的節點是指 topology 中的 spout 或者 bolt
topology中的每一個節點都是並行執行的。在你的topology中,你可以指定每個節點的並行數量n,然后 Storm會啟動 n 個線程在集群中運行
一個 topology 是永遠運行的,直到你殺掉它,Storm 會自動重新分配失敗的任務。另外,Storm 保證沒有數據丟失, 即使主機掛掉消息丟失。
數據模型
Storm 使用 tuple 做數據模型,一個 tuple 是被命名過的值列表,一個 tuple 中的字段可以是任何類型的對象。它是開箱即用的,Storm 支持所有的簡單數據類型,如字符串,字節數組作為 tuple 的字段值。如果要使用另一種類型的對象,你只需要為這個類型實現一個 serializer
topology 中的每一個節點都應該為它要發射的元組聲明輸出字段, 例如, 下面這個bolt聲明了它發射字段為 “double” 和 “triple” 字段的元組:
1 |
public class DoubleAndTripleBolt extends BaseRichBolt { |
declareOutputFields
方法聲明了輸出字段為["double", "triple"]
,
這個 bolt 類的其他部分將在下面的章節中講解。
一個簡單的topology
讓我們來看一個簡單的 topology 來探索更多的概念,看代碼是如何構造起來的。我們從 storm-starter 項目里看看 ExclamationTopology
是如何定義的
1 |
TopologyBuilder builder = new TopologyBuilder(); |
這個 topology 包含一個 spout 和兩個 bolt,spout 發送單詞,每一個 bolt 附加 “!!!” 到它的輸入數據中。這些節點排練成一條線:spout 先發射 tuple 到第一個 bolt,然后第一個 bolt 發送到第二個 bolt。如果 spout 發送 [“bob”] 和 [“john”] 元組,然后第二個bolt會發送 [“bob!!!!!!”] 和 [“john!!!!!!”] 元組
代碼中使用 setSpout
和 setBolt
方法來定義節點.這些方法需要傳入一個用戶指定的id,一個包含處理邏輯的對象,以及你希望這個節點運行的並行數量。在這個例子中,spout 被指定了id “words”, bolt 被指定了id “exclaim1” 和 “exclaim2”
傳入的 Spout 對象實現了 IRichSpout 接口並包含業務邏輯
傳入的 Bolt 對象實現了 IRichBolt 接口並包含業務邏輯
最后一個參數,你想要這個節點的並行數量是幾,這個參數是可選的,它表明有多少線程應該在集群中運行該 組件 ,如果你忽略了它,Storm 會給這個節點只分配一個線程
這里的組件是指被實例化后的節點,即 spout 或者 bolt
setBolt
返回一個 InputDeclarer 對象用來給 bolt 定義輸入。這 “exclaim1”組件 聲明了它要想讀入所有 “words” 組件的發射的打亂分組過的所有 tuple.
“exclaim2” 組件聲明了它要讀入所有 “exclaim1” 發射的打亂分組過的 tuple,”打亂分組”(shuffile group)意味着 tuple 必須從輸入中隨機分發到 bolt 的任務中。有許多在組件之間將數據分組的方法,打亂只是其中一種。接下來的一些小節會解釋到它。
如果你希望 “exclaim2” 組件,既讀取 “words” 又讀取 “exclaim1” 發射的 tuple , 你可以像如下這樣實現 “excliam2” :
1 |
builder.setBolt("exclaim2", new ExclamationBolt(), 5) |
正如你所見,可以給 bolt 鏈式的指定多個數據源。
讓我們深入到這個 topology 中 spouts 和 bolts 的具體實現上。Spouts 負責發射新的消息到 topology中, 在這個 topology 中 TestWordSpouts
方法 從 [“nathan”, “mike”, “jackson”, “golda”, “bertels”] 中每 100毫秒 發射一個隨機的字符, TestWordSpout 中 nextTuple()
方法 的實現是這樣的:
1 |
public void nextTuple() { |
如你所見,這種實現非常的簡單。
ExclamationBolt
附加 “!!!” 到輸入中, 讓我們看看 ExclamationBolt
的完整實現:
1 |
public static class ExclamationBolt implements IRichBolt { |
prepare
方法給 bolt 提供了一個 OutputCollector
對象用來從這個 bolt 中發射 tuple 。 在這個 bolt 中的任何位置都可以發射 tuples – prepare
, execute
, cleanup
方法, 甚至在異步的其他線程中。prepare
方法僅僅保持一個 OutputCollector
對象實例以便在后面 execute
方法中調用。
execute
方法從輸入中接收一個 tuple。ExclamationBolt
從元組中取到第一個字段,然后在后面附加 “!!!” 。 如果你實現的 bolt 訂閱了多個輸入源, 你可以使用 Tuple#getSourceComponent
方法查到當前的 tuple 是來自哪個組件.
execute
方法里還可以做一些其他操作,即將輸入的 tuple 作為 emit 的第一個參數傳入,這樣這個 tuple 會被確認。這是 Storm 可靠api一部分它能保證,不會丟失數據,這些在本教程后面的章節中還會闡述。
cleanup
方法會在 Bolt 停止時被調用,用來關閉清理所有打開的資源。不能保證這個方法一定會在集群中被調用,如果正在運行的機器發生了爆炸(作者在搞笑),這樣就沒辦法調用這個方法了。cleanup
方法其實是專門為你在本地模式(將Storm集群在一個進程中模擬出來)下運行 topology ,你希望運行和殺掉 topology 而不必擔心資源泄露。
declareOutputFields
方法聲明 ExclamationBolt
發射包含一個 word 字段的 tuple
getComponentConfiguration
方法允許你配置影響這個 bolt 如何運行的各種參數,有一個更高級的話題專門討論關於配置的更多內容 Configuration.
像cleanup
和 getComponentConfiguration
方法通常並不是必須的, 你可以通過繼承一個提供了默認實現的基類來更簡潔的定義 bolt。 通過繼承 BaseRichBolt
類 ,ExclamationBolt
可以被實現的更簡潔,如下:
1 |
public static class ExclamationBolt extends BaseRichBolt { |
在本地模式下運行 ExclamationTopology
我們來看下如何在本地模式下運行 ExclamationTopology
Storm 有兩種運行模式:本地模式和分布式模式。在本地模式中,Storm 完全在一個進程中運行,用線程來模擬各個工作節點。本地模式對與開發和測試topology是非常有用的,當你運行 storm-starter 中的 topology時,它會運行在本地模式下,你可以看到每一個組件發射的消息,你可以閱讀更多關於本地模式的內容
在分布式模式下,Storm 運行在一組機器上,當你提交一個 topology 到 master上,就會同時提交所有必要的代碼來運行這個 topology,master會負責分發你的代碼,並分配工作進程來運行你的 topology,如果工作進程掛掉了,master會在某處重新分配他們。你可以閱讀更多關於在一個集群上來運行topology的內容,
下面是在本地模式運行 ExclamationTopology
的代碼
1 |
Config conf = new Config(); |
首先,這段代碼通過創建 LocalCluster
對象定義了一個進程內的集群。提交 topology 到虛擬集群和提交到分布式集群是一樣的,通過調用 submitTopology
來向 LocalCluster
中提交 topology,它接受三個參數,topology的名字,topology的配置,topology本身。
名字是用來識別這個 topology,以便日后殺掉它。。topology會一直運行直到你殺掉它。
配置是用來調優運行 topology 的各個方面,下面是兩個常見的配置:
- TOPOLOGY_WORKERS (用
setNumWorkers
來設置) 指定你將在集群分配幾個進程來運行這個這個topology,topology中的每一個組件會被當做多個線程來運行。一個組件被分配線程的數量通過setBolt
和setSpout
方法來配置,這些線程存在於工作進程中。每個工作進程包含一些組件中的一些線程,例如,你分配了 300 個線程給所有的組件,在配置中設置了50個工作進程,那么每個工作進程會運行6個線程,每一個線程可能屬於不同的組件。通過調整每個元件的並行度和運行這些線程的工作進程的數量來對 storm 的並行性能調優。 - TOPOLOGY_DEBUG (通過 setDebug 設置),當被設為 true 時,storm 將記錄元件發射的每個消息,在本地模式測試topology時這是很有用的,但是在線上模式運行時,你更願意將它關閉
流分組 Stream groupings
流分組讓 topology 知道在組件之間如何發送 tuple,記住 spouts 和 bolts 是被當成很多 tasks 並行運行在整個集群中的,如果你想看看 topology 是如何在 task 層級運行的,就像下圖這樣
這里的task 就是setBolt和 setSpout 中產生的工作線程,如果設置了數量,就是線程組或者任務組即 set of tasks
當一個運行 Bolt A 的 task 發射了一個 tuple 到 Bolt B,那么它應該發射到哪個 task(當然是運行Bolt B 的task) 呢?
流分組 (Stream grouping)答了這個問題,它告訴 Storm 如何在 set of task(任務組)之間發送 tuple,在我們深入不同種類的流分組以前,讓我們看看 storm-start 里的另一個 topology ,WordCountTopology從一個 spout 中讀取句子並且從 WordCountBolt
中獲取某個單詞出現的次數:
1 |
TopologyBuilder builder = new TopologyBuilder(); |
SplitSentence
把它接收到的每一個句子中的每一個單詞當做 tuple 發送出去,WordCount
在內存中維護了一個單詞和數量的映射關系,每次 WordCount
接收到一個單詞,它就更新單詞的數量,然后發送新的單詞數量。
當然還有一些不同種類的流分組。
基本的分組類型叫做 “亂序分組(shuffle grouping)” ,它將使 tuple 被隨機發個一個 task,WordCountTopology
中 使用了亂序分組來從 RandomSentenceSpout
向 SplitSentence
發送 tuple, 這樣所有的處理任務就能夠被平均的分配到所有運行 SplitSentence
Bolt的 task 上。
一個更有趣的分組類型是 字段分組(fields grouping)
,SplitSentence
和 WordCount
之間使用了一個字段分組,WordCount
能夠運作的一個極為重要的要求是相同的單詞必須被發到同一個 task中,否則會有一個以上的 task 會接收到相同的單詞,然后他們會發射錯誤的計數。字段分組使我們可以用字段將一個流分組,這使得相同字段的內容總是被分到同一個task中。由於 WordCount
在 word
字段上使用字段分組訂閱了 SplitSentence
‘s 的輸出流,這樣相同的單詞總是會進入到相同的task.
字段分組是流連接和流聚合以及許多其他用力的基本實現,究其原理,字段分組是通過 mod hashing(哈希的一種) 來實現的
還有一些其他類型的分組,你可以在概念里查看更多。
使用其他編程語言編寫 Bolt
Bolt 可以使用其他編程語言編寫,使用其他語言編寫的 Bolt 會被當做子進程來執行,Storm 通過 stdin/stdout 用json格式的信息來與這些子進程通信,只需要引入一個100行左右代碼的適配器類庫即可完成通信協議, Storm 提供了Ruby , Python 等等語言的類庫
下面是從 WordCountTopology
到 SplitSentence
的實現
1 |
public static class SplitSentence extends ShellBolt implements IRichBolt { |
SplitSentence
重寫了父類的ShellBolt
方法, 聲明它用 splitsentence.py
作為參數來運行python
, 下面是splitsentence.py
的實現
1 |
import storm |
關於如何用其他語言編寫 spouts 和 bolts 以及如何用其他語言編寫 topology 的內容,請查閱
Using non-JVM languages with Storm.
保證消息處理的可靠性
教程的前面我們略過了一些 tuple 發射方面的內容,這些方面的內容就是 Storm 的可靠性 API, 即 Storm 是如何保證從 spout 中出來的信息都能夠被完全的處理,閱讀 Guaranteeing message processing f 來了解它是如何運作的,以及作為一個用戶如何利用 Storm 的可靠性能力。
Transactional Topologies
Storm guarantees that every message will be played through the topology at least once. A common question asked is “how do you do things like counting on top of Storm? Won’t you overcount?” Storm has a feature called transactional topologies that let you achieve exactly-once messaging semantics for most computations. Read more about transactional topologies here.
分布式 RPC
這篇教程解釋了如何在 Storm 上做基本的流處理。當然你還可以用 Storm 的基礎組件做更多的事。其中一個非常有趣的應用是分布式 RPC,在這些 RPC機器上做繁忙的並行計算,閱讀更多關於分布式RPC
總結
這個教程給出了 如何開發、測試以及部署 Storm topology 的概覽,其余的文檔將深入到使用 Storm 使用的方方面面。