Storm內部的消息傳遞機制


作者:Jack47

轉載請保留作者和原文出處

歡迎關注我的微信公眾賬號程序員傑克,兩邊的文章會同步,也可以添加我的RSS訂閱源

一個Storm拓撲,就是一個復雜的多階段的流式計算。Storm中的組件(Component)就是對各個階段的一個抽象,其中的Spout是生產者的角色,它負責源源不斷地從Storm外部接收消息,扔給下游的組件處理,下游組件處理完成后,最終輸出到外部的存儲系統。

本文主要講解消息在Storm內部的各個組件(Component)之間如何進行傳遞,本文適用於JStorm 2.1.0以后的版本。對於JStorm各個版本的改進,見這里

如果讀者對Storm的基本組成部分如Spout,Bolt,Worker進程和Task不了解,可以先看下 Storm介紹(一)Storm介紹(二)“理解Storm拓撲並發”

下文中使用的"消息"(Messagesage)和“元組"(Tuple)兩個詞語其實是同一個意思。

Storm中的每個組件,可以用一個三元組來定義

<inputStreamId, boltImpl, outputStreamInfo>

其中inputStreamId定義了這個組件所消費的流的ID; boltImpl是bolt的具體實現的類;outputStreamInfo代表這個組件的輸出流的信息,包含兩部分:輸出流的ID和路由方式。所以當一個組件發射一個消息后,通過這個流的分組策略(Grouping)就可以立馬計算出消費它的taskId。

拿下面中的拓撲為例,從圖一看到這個拓撲由三個組件構成:藍色的Spout,消費方是Green Bolt,Green Bolt的消費方是Yellow Bolt。這個拓撲在集群上的運行時的狀態如圖二所示:
它由兩個Worker進程組成,每個Work里面運行4個Task。仔細的讀者已經發現了這里沒有Executor,這是JStorm和Storm很大的不同。JStorm認為Executor的存在收益比太低,雖然它支持不停機動態擴大Task的數量,但同時增加了理解成本,增加了應用開發人員編程的復雜度,所以JStorm中去掉了Executor。

->
Storm_component<-

圖一 一個實際拓撲組件的構成

->
Storm_assign_example<-

圖二 集群上運行的一個實際拓撲

在Storm拓撲內部,同一個拓撲的多個Worker之間會發生消息傳遞,比如上圖二中的兩個Worker進程,他們之間的通信就是進程間的通信了,發送的消息需要經過序列化和反序列化。Storm中,Worker之間使用Netty進行網絡通信。

在Storm拓撲的一個Worker進程內部,多個Task之間也會進行通信。比如上圖二中的Task 6和Task 3。Storm中Worker進程內部的消息通信依賴於LMAX Disruptor這個高性能線程間通信的消息通信庫。

Storm內部的消息傳遞

JStorm與Storm在內部消息傳遞機制上的主要差別:
JStorm中獨立出一個線程來專門負責消息的反序列化,這樣執行線程單獨執行,而不是Storm那樣,一個線程負責執行反序列化並執行用戶的邏輯。相當於是把流水線拆解的更小了。這樣對於反序列化時延跟執行時延在同一個數量級的應用性能提升比較明顯。

Storm_Intra_Communication

圖三:JStorm內部消息隊列的概要圖

從圖里看到隊列都是綠色的,這些隊列都是某個Worker內部的隊列。為了可讀性只保留了一個Worker進程(一個storm節點一般都運行多個Worker),而且在這個Worker進程里只畫了一個Task,(再次的,在一個Worker進程里通常有多個Task)

詳細解釋

每個Worker進程有一個NettyServer,它監聽在Worker的TCP端口上(通過 supervisor.slots.ports來配置),其他需要跟它通信的Worker會作為NettyClient分別建立連接。當NettyServer接收到消息,會根據taskId參數把消息放到對應的反序列化隊列(DeserializedQueue)里面。 topology.executor.receive.buffer.size決定了反序列化隊列的大小。TaskReceiver中的反序列化線程專門負責消費反序列化隊列中的消息:先將消息反序列化,然后放到執行隊列(Execute Queue)中去。

執行隊列的消費者是BoltExecutor線程,它負責從隊列中取出消息,執行用戶的代碼邏輯。執行完用戶的代碼邏輯后,最終通過OutputCollect輸出消息,此時消息里已經生成了目標task的taskId。topology.executor.receive.buffer.size決定了執行隊列的大小。可以看到JStorm中執行隊列跟反序列化隊列的大小是同一個配置項,即它們是一致的。

仔細看了圖三的同學會發現執行隊列的生產者除了TaskReceiver外,還有一個。這種消息的來源就是Worker內部的其他Task的TaskTransfer。

輸出的消息通過TaskTransfer來發送,如果目標Task是Worker內部的Task,就直接扔到目標Task的執行隊列中去。如果目標Task是在其他Worker上,那就先放到序列化隊列中,然后由單獨的一個線程專門負責序列化,然后通過NettyClient發送出去。topology.executor.send.buffer.size決定了序列化隊列的大小。

每個Worker進程有多個NettyClient,他們負責與其他的Worker進行網絡通信。

延伸閱讀

如何配置Storm的內部消息緩存

上面提到的眾多配置項都在conf/defaults.yaml里有定義。可以通過在Storm集群的conf/storm.yaml里進行配置來全局的覆值。也可以通過Storm的Java API backtype.storm.Config 來對單個的Storm拓撲進行配置。

如何配置Storm的並發###

Storm消息緩存的正確配置不但是和你的拓撲的負載類型緊密關聯的,而且和拓撲的並發度有很大關系。后者的詳細信息見理解Storm並發一文。

了解Storm拓撲上發生了什么?

Storm UI是一個用於觀察你正在運行的拓撲的關鍵指標的良好開端。例如,它可以給你展示所謂的Spout/Bolt的“容量”。眾多的指標可以幫助你決定對本文中提到的眾多緩存相關的配置參數的修改,對你的拓撲的運行效率的影響是正向的還是負向的。更多信息見"運行一個多節點Storm集群"一文。

除此之外還可以注冊應用程序自己的指標並使用類似Graphite這類工具來跟蹤它們。詳細信息見“從Storm發送指標到Graphite"和“通過RPM和Supervisord來安裝和運行Graphite"。

性能調優方面的建議

可以看Storm主要作者Nathan Marz的演講:調優和上線Storm拓撲

看看其他沒有提到的參數:
topology.spout.max.batch.size
數據容量(data volume),數據速度(velocity),消息的大小,處理一個消息的計算復雜度。所以需要不斷調整才能找到最佳的參數,沒有銀彈。

參考資料:

Understanding storm internal message buffers


如果您看了本篇博客,覺得對您有所收獲,請點擊右下角的“推薦”,讓更多人看到!
資助Jack47寫作,打賞一個雞蛋灌餅錢吧
pay_weixin
微信打賞
pay_alipay
支付寶打賞


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM