Storm特性###
1. 低延遲和高性能
在一個小集群中,每個節點每秒可以處理數以百萬計的消息。
2. 可擴展
在Storm集群中主要有三個實體:工作進程、線程和任務。Storm集群中每台機器上都可以運行多個工作進程,每個工作進程又可以創建多個線程,每個線程可以執行多個任務,任務是真正進行數據處理的實體。
3. 高可靠性
Storm可以保證Spout發出的每條消息都能被完全處理,Spout發出的消息后續可能會觸發產生成千上萬條消息,可以形象的理解為一棵消息樹,只有當這顆消息樹中的所有消息都被處理了才叫“完全處理”,這種特殊的策略會在后面詳細介紹。
4. 高容錯性
如果消息在處理過程中出現了一些異常,Storm會重新部署這個出問題的處理單元,Storm保證一個處理單元永遠運行(除非你顯式結束這個處理單元)
5. 編程模型簡單
Storm為大數據的實時計算提供了一些簡單優美的原語,大大降低了開發並行實時處理任務的復雜性。
6. 支持多種編程語言
7. 支持本地模式
Storm與Hadoop的比較###
Hadoop架構簡介
Hadoop架構的核心組成部分是HDFS(Hadoop Distributed File System,Hadoop分布式文件系統)和MapReduce分布式計算框架。HDFS采用Master/Slave體系結構,在集群中由一個主節點充當NameNode,負責文件系統元數據的管理,其它多個子節點充當Datanode,負責存儲實際的數據塊。
MapReduce分布式計算模型由JobTracker和TaskTracker兩類服務進程實現,JobTracker負責任務的調度和管理,TaskTracker負責實際任務的執行。
Hadoop架構的瓶頸
在手機閱讀BI大屏時延項目中,業務需求為處理業務平台產生的海量用戶數據,展現業務中PV(Page View,頁面瀏覽量)、UV(Unique Visitor,獨立訪客)、營收和付費用戶數等關鍵運營指標,供領導層實時了解運營狀況,做出經營決策。在一期項目的需求描述中,允許的計算時延是15分鍾。
根據需求,在一期項目的實施中,搭建了Hadoop平台與Hive數據倉庫,通過編寫Hive存儲過程,來完成數據的處理,相當於是一個離線的批處理過程。不同的運營指標擁有不同的算法公式,各公式的復雜程度不同導致各運營指標算法復雜度不同,因此所需要的計算時延也各不相同,如PV指標的計算公式相對簡單,可以在5分鍾內完成計算,而頁面訪問成功率指標的計算公式相對復雜,需要10分鍾以上才能完成計算。項目到達二期階段時,對實時性的要求有了進一步提高,允許的計算時延減少到5分鍾。在這種應用場景下,Hadoop架構已經不能滿足需要,無法在指定的時延內完成所有運營指標的計算。
在以上的應用場景中,Hadoop的瓶頸主要體現在以下兩點:
- MapReduce計算框架初始化較為耗時,並不適合小規模的批處理計算。因為MapReduce框架並非輕量級框架,在運行一個作業時,需要進行很多初始化的工作,主要包括檢查作業的輸入輸出路徑,將作業的輸入數據分塊,建立作業統計信息以及將作業代碼的Jar文件和配置文件拷貝到HDFS上。當輸入數據的規模很大時,框架初始化所耗費的時間遠遠小於計算所耗費的時間,所以初始化的時間可以忽略不計;而當輸入數據的規模較小時,初始化所耗費的時間甚至超過了計算所耗費的時間,導致計算效率低下,產生了性能上的瓶頸。
- Reduce任務的計算速度較慢。有的運營指標計算公式較為復雜,為之編寫的Hive存儲過程經Hive解釋器解析后產生了Reduce任務,導致無法在指定的時延內完成計算。這是由於Reduce任務的計算過程分為三個階段,分別是copy階段,sort階段和reduce階段。其中copy階段要求每個計算節點從其它所有計算節點上抽取其所需的計算結果,copy操作需要占用大量的網絡帶寬,十分耗時,從而造成Reduce任務整體計算速度較慢。
Storm架構簡介
與Hadoop主從架構一樣,Storm也采用Master/Slave體系結構,分布式計算由Nimbus和Supervisor兩類服務進程實現,Nimbus進程運行在集群的主節點,負責任務的指派和分發,Supervisor運行在集群的從節點,負責執行任務的具體部分。
- Nimbus:負責資源分配和任務調度。
- Supervisor:負責接受nimbus分配的任務,啟動和停止屬於自己管理的worker進程。
- Worker:運行具體處理組件邏輯的進程。
- Task:worker中每一個spout/bolt的線程稱為一個task。同一個spout/bolt的task可能會共享一個物理線程,該線程稱為executor。
Storm架構中使用Spout/Bolt編程模型來對消息進行流式處理。消息流是Storm中對數據的基本抽象,一個消息流是對一條輸入數據的封裝,源源不斷輸入的消息流以分布式的方式被處理。Spout組件是消息生產者,是Storm架構中的數據輸入源頭,它可以從多種異構數據源讀取數據,並發射消息流。Bolt組件負責接收Spout組件發射的信息流,並完成具體的處理邏輯。在復雜的業務邏輯中可以串聯多個Bolt組件,在每個Bolt組件中編寫各自不同的功能,從而實現整體的處理邏輯。
Storm架構與Hadoop架構對比
在Hadoop架構中,主從節點分別運行JobTracker和TaskTracker進程,在Storm架構中,主從節點分別運行Nimbus和Supervisor進程。在Hadoop架構中,應用程序的名稱是Job,Hadoop將一個Job解析為若干Map和Reduce任務,每個Map或Reduce任務都由一個Child進程來運行,該Child進程是由TaskTracker在子節點上產生的子進程。
在Storm架構中,應用程序的名稱是Topology,Storm將一個Topology划分為若干個部分,每部分由一個Worker進程來運行,該Worker進程是Supervisor在子節點上產生的子進程,在每個Worker進程中存在着若干Spout和Bolt線程,分別負責Spout和Bolt組件的數據處理過程。
從應用程序的比較中可以看明顯地看到Hadoop和Storm架構的主要不同之處。在Hadoop架構中,應用程序Job代表着這樣的作業:輸入是確定的,作業可以在有限時間內完成,當作業完成時Job的生命周期走到終點,輸出確定的計算結果。而在Storm架構中,Topology代表的並不是確定的作業,而是持續的計算過程。在確定的業務邏輯處理框架下,輸入數據源源不斷地進入系統,經過流式處理后以較低的延遲產生輸出。如果不主動結束這個Topology或者關閉Storm集群,那么數據處理的過程就會持續地進行下去。
通過以上的分析,我們可以看到,Storm架構是如何解決Hadoop架構瓶頸的:
- Storm的Topology只需初始化一次。在將Topology提交到Storm集群的時候,集群會針對該Topology做一次初始化的工作。此后,在Topology運行過程中,對於輸入數據而言,是沒有計算框架初始化耗時的,有效避免了計算框架初始化的時間損耗。
- Storm使用ZeroMQ作為底層的消息隊列來傳遞消息,保證消息能夠得到快速的處理。同時Storm采用內存計算模式,無需借助文件存儲,直接通過網絡直傳中間計算結果,避免了組件之間傳輸數據的大量時間損耗。
Storm基本概念###
Storm集群和Hadoop集群表面上看很類似。但是Hadoop上運行的是MapReduce jobs,而在Storm上運行的是拓撲(topology),這兩者之間是非常不一樣的。一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會運行(除非你手動kill掉)。
在Storm的集群里面有兩種節點: 控制節點(master node)和工作節點(worker node)。控制節點上面運行一個叫Nimbus后台程序,它的作用類似Hadoop里面的JobTracker。Nimbus負責在集群里面分發代碼,分配計算任務給機器,並且監控狀態。
每一個工作節點上面運行一個叫做Supervisor的節點。Supervisor會監聽分配給它那台機器的工作,根據需要啟動/關閉工作進程worker。每一個工作進程執行一個topology的一個子集;一個運行的topology由運行在很多機器上的很多工作進程worker組成。
Nimbus和Supervisor之間的所有協調工作都是通過Zookeeper集群完成。另外,Nimbus進程和Supervisor進程都是快速失敗(fail-fast)和無狀態的。所有的狀態要么在zookeeper里面, 要么在本地磁盤上。這也就意味着你可以用kill -9來殺死Nimbus和Supervisor進程, 然后再重啟它們,就好像什么都沒有發生過。這個設計使得Storm異常的穩定。
Topology
計算任務Topology是由不同的Spouts和Bolts,通過數據流(Stream)連接起來的圖。下面是一個Topology的結構示意圖:
其中包含有:
-
Spout:Storm中的消息源,用於為Topology生產消息(數據),一般是從外部數據源(如Message Queue、RDBMS、NoSQL、Realtime Log)不間斷地讀取數據並發送給Topology消息(tuple元組)。Spout可以是可靠的,也可以是不可靠的。如果這個tuple沒有被Storm完全處理,可靠的消息源可以重新發射一個tuple,但是不可靠的消息源一旦發出一個tuple就不能重發了。(可靠性會在下面介紹)
Spout類里面最重要的方法是nextTuple。要么發射一個新的tuple到topology里面或者簡單的返回(如果已經沒有新的tuple)。要注意的是nextTuple方法不能阻塞,因為storm在同一個線程上面調用所有消息源spout的方法。
另外兩個比較重要的spout方法是ack和fail。storm在檢測到一個tuple被整個topology成功處理的時候調用ack,否則調用fail。storm只對可靠的spout調用ack和fail。 -
Bolt:Storm中的消息處理者,用於為Topology進行消息的處理,Bolt可以執行過濾, 聚合, 查詢數據庫等操作,而且可以一級一級的進行處理。
下圖是Topology的提交流程圖:
下圖是Storm的數據交互圖。可以看出兩個模塊Nimbus和Supervisor之間沒有直接交互。狀態都是保存在Zookeeper上。Worker之間通過ZeroMQ傳送數據。(storm所有的元數據信息保存在Zookeeper中。)
Worker (進程)
一個topology可能會在一個或者多個worker(工作進程)里面執行,每個worker是一個物理JVM並且執行整個topology的一部分。比如,對於並行度是300的topology來說,如果我們使用50個工作進程worker來執行,那么每個工作進程會處理其中的6個tasks。Storm會盡量均勻的工作分配給所有的worker。setBolt 的最后一個參數是你想為bolts的並行量。
一個topology的worker數量是conf設置的,這個設置是說給這個topo多少個worker資源給他。
Tasks
每一個spout和bolt會被當作很多task在整個集群里執行。每一個executor對應到一個線程,在這個線程上運行多個task,而stream grouping則是定義怎么從一堆task發射tuple到另外一堆task。你可以調用TopologyBuilder類的setSpout和setBolt來設置並行度。