什么是Apache Storm
Apache Storm是一個分布式實時大數據處理系統。Storm設計用於在容錯和水平可擴展方法中處理大量數據。它是一個流數據框架,具有最高的攝取率。雖然Storm是無狀態的,它通過Apache ZooKeeper管理分布式環境和集群狀態。通過Storm可以並行地對實時數據執行各種操作。Storm易於部署和操作,並且它可以保證每個消息將通過拓撲至少處理一次。
Apache Storm核心概念
Apache Storm從一端讀取實時數據的原始流,並將其傳遞通過一系列小處理單元,並在另一端輸出處理/有用的信息。
下圖描述了Apache Storm的核心概念。
Apache Storm的組件
Tuple
Tuple是Storm中的主要數據結構。它是有序元素的列表。默認情況下,Tuple支持所有數據類型。通常,它被建模為一組逗號分隔的值,並傳遞到Storm集群。
Stream
流是元組的無序序列。
Spouts
流的源。通常,Storm從原始數據源(如Twitter Streaming API,Apache Kafka隊列,Kestrel隊列等)接受輸入數據。否則,您可以編寫spouts以從數據源讀取數據。“ISpout”是實現spouts的核心接口,一些特定的接口是IRichSpout,BaseRichSpout,KafkaSpout等。
Bolts
Bolts是邏輯處理單元。Spouts將數據傳遞到Bolts和Bolts過程,並產生新的輸出流。Bolts可以執行過濾,聚合,加入,與數據源和數據庫交互的操作。Bolts接收數據並發射到一個或多個Bolts。 “IBolt”是實現Bolts的核心接口。一些常見的接口是IRichBolt,IBasicBolt等。
拓撲
Spouts和Bolts連接在一起,形成拓撲結構。實時應用程序邏輯在Storm拓撲中指定。簡單地說,拓撲是有向圖,其中頂點是計算,邊緣是數據流。
簡單拓撲從spouts開始。Spouts將數據發射到一個或多個Bolts。Bolt表示拓撲中具有最小處理邏輯的節點,並且Bolts的輸出可以發射到另一個Bolts作為輸入。
Storm保持拓撲始終運行,直到您終止拓撲。Apache Storm的主要工作是運行拓撲,並在給定時間運行任意數量的拓撲。
任務
現在你有一個關於Spouts和Bolts的基本想法。它們是拓撲的最小邏輯單元,並且使用單個Spout和Bolt陣列構建拓撲。應以特定順序正確執行它們,以使拓撲成功運行。Storm執行的每個Spout和Bolt稱為“任務”。簡單來說,任務是Spouts或Bolts的執行。
進程
拓撲在多個工作節點上以分布式方式運行。Storm將所有工作節點上的任務均勻分布。工作節點的角色是監聽作業,並在新作業到達時啟動或停止進程。
流分組
數據流從Spouts流到Bolts,或從一個Bolts流到另一個Bolts。流分組控制元組在拓撲中的路由方式,並幫助我們了解拓撲中的元組流。有如下分組:
- Shuffle grouping: Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples.
- Fields grouping: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks.
- Partial Key grouping: The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This paper provides a good explanation of how it works and the advantages it provides.
- All grouping: The stream is replicated across all the bolt's tasks. Use this grouping with care.
- Global grouping: The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id.
- None grouping: This grouping specifies that you don't care how the stream is grouped. Currently, none groupings are equivalent to shuffle groupings. Eventually though, Storm will push down bolts with none groupings to execute in the same thread as the bolt or spout they subscribe from (when possible).
- Direct grouping: This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the
emit
method in OutputCollector (which returns the task ids that the tuple was sent to). - Local or shuffle grouping: If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping.
Apache Storm集群架構
Apache Storm的主要亮點是,它是一個容錯,快速,沒有“單點故障”(SPOF)分布式應用程序。我們可以根據需要在多個系統中安裝Apache Storm,以增加應用程序的容量。
讓我們看看Apache Storm集群如何設計和其內部架構。下圖描述了集群設計。
Apache Storm有兩種類型的節點,Nimbus(主節點)和Supervisor(工作節點)。Nimbus是Apache Storm的核心組件。Nimbus的主要工作是運行Storm拓撲。Nimbus分析拓撲並收集要執行的任務。然后,它將任務分配給可用的supervisor。
Supervisor將有一個或多個工作進程。Supervisor將任務委派給工作進程。工作進程將根據需要產生盡可能多的執行器並運行任務。Apache Storm使用內部分布式消息傳遞系統來進行Nimbus和管理程序之間的通信。Storm廣泛使用Thrift協議進行內部通信和數據定義。Storm拓撲只是Thrift Structs。在Apache Storm中運行拓撲的Storm Nimbus是一個Thrift服務。
Nimbus(主節點)
Nimbus是Storm集群的主節點。集群中的所有其他節點稱為工作節點。主節點負責在所有工作節點之間分發數據,向工作節點分配任務和監視故障。
Supervisor(工作節點)
遵循指令的節點被稱為Supervisors。Supervisor有多個工作進程,它管理工作進程以完成由nimbus分配的任務。
Worker process(工作進程)
工作進程將執行與特定拓撲相關的任務。工作進程不會自己運行任務,而是創建執行者(Executor)並要求他們執行特定的任務。工作進程將有多個執行者。
Executor(執行者)
執行者只是工作進程產生的單個線程。執行者運行一個或多個任務,但僅用於特定的spout或bolt。
Task(任務)
任務執行實際的數據處理。所以,它是一個spout或bolt。
ZooKeeper framework(ZooKeeper框架)
Apache的ZooKeeper的是使用群集(節點組)自己和維護具有強大的同步技術共享數據之間進行協調的服務。Nimbus是無狀態的,所以它依賴於ZooKeeper來監視工作節點的狀態。
ZooKeeper幫助supervisor與nimbus交互,它負責維持nimbus,supervisor的狀態。
Storm是無狀態的。即使無狀態性質有它自己的缺點,它實際上幫助Storm以最好的可能和最快的方式處理實時數據。
Storm雖然不是完全無狀態的。它將其狀態存儲在Apache ZooKeeper中。由於狀態在Apache ZooKeeper中可用,故障的網絡可以重新啟動,並從它離開的地方工作。通常,像monitor這樣的服務監視工具將監視Nimbus,並在出現任何故障時重新啟動它。
Apache Storm工作流程
一個工作的Storm集群應該有一個Nimbus和一個或多個supervisors。另一個重要的節點是Apache ZooKeeper,它將用於nimbus和supervisors之間的協調。
現在讓我們仔細看看Apache Storm的工作流程 −
- 最初,nimbus將等待“Storm拓撲”提交給它。
- 一旦提交拓撲,它將處理拓撲並收集要執行的所有任務和任務將被執行的順序。
- 然后,nimbus將任務均勻分配給所有可用的supervisors。
- 在特定的時間間隔,所有supervisor將向nimbus發送心跳以通知它們仍然運行着。
- 當supervisor終止並且不向心跳發送心跳時,則nimbus將任務分配給另一個supervisor。
- 當nimbus本身終止時,supervisor將在沒有任何問題的情況下對已經分配的任務進行工作。
- 一旦所有的任務都完成后,supervisor將等待新的任務進去。
- 同時,終止nimbus將由服務監控工具自動重新啟動。
- 重新啟動的網絡將從停止的地方繼續。同樣,終止supervisor也可以自動重新啟動。由於網絡管理程序和supervisor都可以自動重新啟動,並且兩者將像以前一樣繼續,因此Storm保證至少處理所有任務一次。
- 一旦處理了所有拓撲,則網絡管理器等待新的拓撲到達,並且類似地,管理器等待新的任務。
默認情況下,Storm集群中有兩種模式:
- 本地模式 -此模式用於開發,測試和調試,因為它是查看所有拓撲組件協同工作的最簡單方法。在這種模式下,我們可以調整參數,使我們能夠看到我們的拓撲如何在不同的Storm配置環境中運行。在本地模式下,storm拓撲在本地機器上在單個JVM中運行。
- 生產模式 -在這種模式下,我們將拓撲提交到工作Storm集群,該集群由許多進程組成,通常運行在不同的機器上。如在storm的工作流中所討論的,工作集群將無限地運行,直到它被關閉。
Storm使用經驗分享
1.使用組件的並行度代替線程池或額外的線程
Storm自身是一個分布式、多線程的框架,對每個Spout和Bolt,我們都可以設置其並發度;它也支持通過rebalance命令來動態調整並發度,把負載分攤到多個Worker上。
如果自己在組件內部采用線程池做一些計算密集型的任務,比如JSON解析,有可能使得某些組件的資源消耗特別高,其他組件又很低,導致Worker之間資源消耗不均衡,這種情況在組件並行度比較低的時候更明顯。
比如某個Bolt設置了1個並行度,但在Bolt中又啟動了線程池,這樣導致的一種后果就是,集群中分配了這個Bolt的Worker進程可能會把機器的資源都給消耗光了,影響到其他Topology在這台機器上的任務的運行。如果真有計算密集型的任務,我們可以把組件的並發度設大,Worker的數量也相應提高,讓計算分配到多個節點上。
為了避免某個Topology的某些組件把整個機器的資源都消耗光的情況,除了不在組件內部啟動線程池來做計算以外,也可以通過CGroup控制每個Worker的資源使用量。
不要在組件內部使用使用額外的線程,比如啟動了額外的線程或Timer去處理邏輯,Storm並不保證額外的線程中處理數據的線程安全。
2.不要用DRPC批量處理大數據
RPC提供了應用程序和Storm Topology之間交互的接口,可供其他應用直接調用,使用Storm的並發性來處理數據,然后將結果返回給調用的客戶端。這種方式在數據量不大的情況下,通常不會有問題,而當需要處理批量大數據的時候,問題就比較明顯了。
(1)處理數據的Topology在超時之前可能無法返回計算的結果。
(2)批量處理數據,可能使得集群的負載短暫偏高,處理完畢后,又降低回來,負載均衡性差。
批量處理大數據不是Storm設計的初衷,Storm考慮的是時效性和批量之間的均衡,更多地看中前者。需要准實時地處理大數據量,可以考慮Spark等批量框架。
3.不要在Spout中處理耗時的操作
Spout中nextTuple方法會發射數據流,在啟用Ack的情況下,fail方法和ack方法會被觸發。需要明確一點,在Storm中Spout是單線程(JStorm的Spout分了3個線程,分別執行nextTuple方法、fail方法和ack方法)。如果nextTuple方法非常耗時,某個消息被成功執行完畢后,Acker會給Spout發送消息,Spout若無法及時消費,可能造成ACK消息超時后被丟棄,然后Spout反而認為這個消息執行失敗了,造成邏輯錯誤。反之若fail方法或者ack方法的操作耗時較多,則會影響Spout發射數據的量,造成Topology吞吐量降低。
4.注意fieldsGrouping的數據均衡性
fieldsGrouping是根據一個或者多個Field對數據進行分組,不同的目標Task收到不同的數據,而同一個Task收到的數據會相同。假設某個Bolt根據用戶ID對數據進行fieldsGrouping,如果某一些用戶的數據特別多,而另外一些用戶的數據又比較少,那么就可能使得下一級處理Bolt收到的數據不均衡,整個處理的性能就會受制於某些數據量大的節點。可以加入更多的分組條件或者更換分組策略,使得數據具有均衡性。
5.優先使用localOrShuffleGrouping
localOrShuffleGrouping是指如果目標Bolt中的一個或者多個Task和當前產生數據的Task在同一個Worker進程里面,那么就走內部的線程間通信,將Tuple直接發給在當前Worker進程的目的Task。否則,同shuffleGrouping。
localOrShuffleGrouping的數據傳輸性能優於shuffleGrouping,因為在Worker內部傳輸,只需要通過Disruptor隊列就可以完成,沒有網絡開銷和序列化開銷。因此在數據處理的復雜度不高,而網絡開銷和序列化開銷占主要地位的情況下,可以優先使用localOrShuffleGrouping來代替shuffleGrouping。
6.設置合理的MaxSpoutPending值
在啟用Ack的情況下,Spout中有個RotatingMap用來保存Spout已經發送出去,但還沒有等到Ack結果的消息。RotatingMap的最大個數是有限制的,為p*num-tasks。其中p是topology.max.spout.pending值,也就是MaxSpoutPending(也可以由TopologyBuilder在setSpout通過setMaxSpoutPending方法來設定),num-tasks是Spout的Task數。如果不設置MaxSpoutPending的大小或者設置得太大,可能消耗掉過多的內存導致內存溢出,設置太小則會影響Spout發射Tuple的速度。
7.設置合理的Worker數
Worker數越多,性能越好?並不是!
這是由於一方面,每新增加一個Worker進程,都會將一些原本線程間的內存通信變為進程間的網絡通信,這些進程間的網絡通信還需要進行序列化與反序列化操作,這些降低了吞吐率。
另一方面,每新增加一個Worker進程,都會額外地增加多個線程(Netty發送和接收線程、心跳線程、System Bolt線程以及其他系統組件對應的線程等),這些線程切換消耗了不少CPU,sys 系統CPU消耗占比增加,在CPU總使用率受限的情況下,降低了業務線程的使用效率。
8.平衡吞吐量和時效性
Storm的數據傳輸默認使用Netty。在數據傳輸性能方面,有如下的參數可以調整:
storm.messaging.netty.server_worker_threads和storm.messaging.netty.client_worker_threads分別為接收消息線程和發送消息線程的數量。
netty.transfer.batch.size是指每次 Netty Client向 Netty Server發送的數據的大小,如果需要發送的Tuple消息大於netty.transfer.batch.size,則Tuple消息會按照netty.transfer.batch.size進行切分,然后多次發送。
storm.messaging.netty.buffer_size為每次批量發送的Tuple序列化之后的TaskMessage消息的大小。
storm.messaging.netty.flush.check.interval.ms表示當有TaskMessage需要發送的時候, Netty Client檢查可以發送數據的頻率。降低storm.messaging.netty.flush.check.interval.ms的值,可以提高時效性。增加netty.transfer.batch.size和storm.messaging.netty.buffer_size的值,可以提升網絡傳輸的吐吞量,使得網絡的有效載荷提升(減少TCP包的數量,並且TCP包中的有效數據量增加),通常時效性就會降低一些。因此需要根據自身的業務情況,合理在吞吐量和時效性直接的平衡。
除了這些參數,我們怎么找到Storm中性能的瓶頸,可以通過如下的一些途徑來進行:
在Storm的UI中,對每個Topology都提供了相應的統計信息,其中有3個參數對性能來說參考意義比較明顯,包括Execute latency、Process latency和Capacity。
分別看一下這3個參數的含義和作用。
(1)Execute latency:消息的平均處理時間,單位為毫秒。
(2)Process latency:消息從收到到被ack掉所花的時間,單位為毫秒。如果沒有啟用Acker機制,那么Process latency的值為0。
(3)Capacity:計算公式為Capacity = Bolt或者Executor調用execute方法處理的消息數量 * 消息平均執行時間 / 時間區間。這個值越接近1,說明Bolt或者Executor基本一直在調用execute方法,因此並行度不夠,需要擴展這個組件的Executor數量。
為了在Storm中達到高性能,我們在設計和開發Topology的時候,需要注意以下原則:
(1)模塊和模塊之間解耦,模塊之間的層次清晰,每個模塊可以獨立擴展,並且符合流水線的原則。
(2)無狀態設計,無鎖設計,水平擴展支持。
(3)為了達到高的吞吐量,延遲會加大;為了低延遲,吞吐量可能降低,需要在二者之間平衡。
(4)性能的瓶頸永遠在熱點,解決熱點問題。
(5)優化的前提是測量,而不是主觀臆測。收集相關數據,再動手,事半功倍。
參考:
https://www.w3cschool.cn/apache_storm/apache_storm_introduction.html
https://zhuanlan.zhihu.com/p/20504669
http://storm.apache.org/index.html