storm 架構與原理
1 storm簡介
1.1 storm是什么
-
如果只用一句話來描述 storm 是什么的話:分布式 && 實時 計算系統。按照作者 Nathan Marz 的說法,storm對於實時計算的意義類似於hadoop對於批處理的意義。
-
Hadoop(大數據分析領域無可爭辯的王者)專注於批處理。這種模型對許多情形(比如為網頁建立索引)已經足夠,但還存在其他一些使用模型,它們需要來自高度動態的來源的實時信息。為了解決這個問題,就得借助 Nathan Marz 推出的 storm(現在已經被Apache孵化)storm 不處理靜態數據,但它處理連續的流數據。
1.2 storm 與傳統的大數據
-
storm 與其他大數據解決方案的不同之處在於它的處理方式。Hadoop 在本質上是一個批處理系統。數據被引入 Hadoop 文件系統 (HDFS) 並分發到各個節點進行處理。當處理完成時,結果數據返回到 HDFS 供始發者使用。storm 支持創建拓撲結構來轉換沒有終點的數據流。不同於 Hadoop 作業,這些轉換從不停止,它們會持續處理到達的數據。
-
Hadoop 的核心是使用 Java™ 語言編寫的,但支持使用各種語言編寫的數據分析應用程序。而 Twitter Storm 是使用 Clojure語言實現的。
-
Clojure 是一種基於虛擬機 (VM) 的語言,在 Java 虛擬機上運行。但是,盡管 storm 是使用 Clojure 語言開發的,您仍然可以在 storm 中使用幾乎任何語言編寫應用程序。所需的只是一個連接到 storm 的架構的適配器。已存在針對 Scala,JRuby,Perl 和 PHP 的適配器,但是還有支持流式傳輸到 Storm 拓撲結構中的結構化查詢語言適配器。
2 Hadoop 架構的瓶頸
- Hadoop是優秀的大數據離線處理技術架構,主要采用的思想是“分而治之”,對大規模數據的計算進行分解,然后交由眾多的計算節點分別完成,再統一匯總計算結果。Hadoop架構通常的使用方式為批量收集輸入數據,批量計算,然后批量吐出計算結果。然而Hadoop結構在處理實時性要求較高的業務時,卻顯得力不從心。本章內容對Hadoop架構這種瓶頸的產生原因進行了探究。 實時性不足(基於離線)
2.1 Hadoop架構簡介
-
Hadoop架構的核心組成部分是HDFS(Hadoop Distributed File System,Hadoop分布式文件系統)和MapReduce分布式計算框架。HDFS采用Master/Slave體系結構,在集群中由一個主節點充當NameNode,負責文件系統元數據的管理,其它多個子節點充當Datanode,負責存儲實際的數據塊。
-
MapReduce分布式計算模型由JobTracker和TaskTracker兩類服務進程實現,JobTracker負責任務的調度和管理,TaskTracker負責實際任務的執行。
2.2 Hadoop架構的瓶頸
-
在手機閱讀BI大屏時延項目中,業務需求為處理業務平台產生的海量用戶數據,展現業務中PV(Page View,頁面瀏覽量)、UV(Unique Visitor,獨立訪客)。營收和付費用戶數等關鍵運營指標,供領導層實時了解運營狀況,做出經營決策,在一期項目的需求描述中,允許的計算時延是15分鍾。
-
根據需求,在一期項目的實施中,搭建了Hadoop平台與Hive數據倉庫,通過編寫Hive存儲過程,來完成數據的處理,相當於是一個離線的批處理過程,不同的運營指標擁有不同的算法公式,各公式的復雜程度不同導致各運營指標算法復雜度不同,因此所需要的計算時延也各不相同,如PV指標的計算公式相對簡單,可以在5分鍾內完成計算,而頁面訪問成功率指標的計算公式相對復雜,需要10分鍾以上才能完成計算。項目到達二期階段時,對實時性的要求有了進一步提高,允許的計算時延減少到5分鍾,在這種應用場景下,Hadoop架構已經不能滿足需要,無法在指定的時延內完成所有運營指標的計算。
- 在以上的應用場景中,Hadoop的瓶頸主要體現在以下兩點:
- 1) MapReduce計算框架初始化較為耗時,並不適合小規模的批處理計算。因為MapReduce框架並非輕量級框架,在運行一個作業時,需要進行很多 初始化 的工作,主要包括檢查作業的輸入輸出路徑,將作業的輸入數據分塊,建立作業統計信息以及將作業代碼的Jar文件和配置文件拷貝到HDFS上,當輸入數據的規模很大時,框架初始化所耗費的時間遠遠小於計算所耗費的時間,所以初始化的時間可以忽略不計;而當輸入數據的規模較小時,初始化所耗費的時間甚至超過了計算所耗費的時間,導致計算效率低下,產生了性能上的瓶頸。
- 2) Reduce任務的計算速度較慢。有的運營指標計算公式較為復雜,為之編寫的Hive存儲過程經Hive解釋器解析后產生了Reduce任務,導致無法在指定的時延內完成計算。這是由於Reduce任務的計算過程分為三個階段,分別是copy階段,sort階段和reduce階段。其中copy階段要求每個計算節點從其它所有計算節點上抽取其所需的計算結果,copy操作需要占用大量的網絡帶寬,十分耗時,從而造成Reduce任務整體計算速度較慢。
3 storm 架構的優點
- storm的流式處理計算模式保證了任務能夠只進行一次初始化,就能夠持續計算,同時使用了ZeroMQ(Netty)作為底層消息隊列,有效地提高了整體架構的數據處理效率,避免了Hadoop的瓶頸。
- Storm的適用場景:
- 1.流數據處理,Storm可以用來處理源源不斷流進來的消息,處理之后將結果寫入到某個存儲中去。
- 2.分布式rpc,由於storm的處理組件是分布式的,而且處理延遲極低,所以可以作為一個通用的分布式rpc框架來使用。
- 3.持續計算,任務一次初始化,一直運行,除非你手動kill它。
3.1 storm架構的設計
- 與Hadoop主從架構一樣,Storm也采用Master/Slave體系結構,分布式計算由Nimbus和Supervisor兩類服務進程實現,Nimbus進程運行在集群的主節點,負責任務的指派和分發,Supervisor運行在集群的從節點,負責執行任務的具體部分。
- 如圖所示:
- Nimbus:負責資源分配和任務調度。
- Supervisor:負責接受nimbus分配的任務,啟動和停止屬於自己管理的worker進程。
- Worker:運行具體處理組件邏輯的進程。
- Task:worker中每一個spout/bolt的線程稱為一個task。同一個spout/bolt的task可能會共享一個物理線程,該線程稱為executor。
- storm架構中使用Spout/Bolt編程模型來對消息進行流式處理。消息流是storm中對數據的基本抽象,一個消息流是對一條輸入數據的封裝,源源不斷輸入的消息流以分布式的方式被處理,Spout組件是消息生產者,是storm架構中的數據輸入源頭,它可以從多種異構數據源讀取數據,並發射消息流,Bolt組件負責接收Spout組件發射的信息流,並完成具體的處理邏輯。在復雜的業務邏輯中可以串聯多個Bolt組件,在每個Bolt組件中編寫各自不同的功能,從而實現整體的處理邏輯。
3.2 storm架構與Hadoop架構的對比
-
storm架構與Hadoop架構的總體結構相似。
結構 Hadoop Storm 主節點 JobTracker Nimbus 從節點 TaskTracker Supervisor 應用程序 Job Topology 工作進程名稱 Child Worker 計算模型 Map / Reduce Spout / Bolt -
在Hadoop架構中,主從節點分別運行JobTracker和TaskTracker進程,在storm架構中,主從節點分別運行Nimbus和Supervisor進程。在Hadoop架構中,應用程序的名稱是Job,Hadoop將一個Job解析為若干Map和Reduce任務,每個Map或Reduce任務都由一個Child進程來運行,該Child進程是由TaskTracker在子節點上產生的子進程。
-
在Storm架構中,應用程序的名稱是Topology,Storm將一個Topology划分為若干個部分,每部分由一個Worker進程來運行,該Worker進程是Supervisor在子節點上產生的子進程,在每個Worker進程中存在着若干Spout和Bolt線程,分別負責Spout和Bolt組件的數據處理過程。
-
從應用程序的比較中可以看明顯地看到Hadoop和Storm架構的主要不同之處。在Hadoop架構中,應用程序Job代表着這樣的作業:輸入是確定的,作業可以在有限時間內完成,當作業完成時Job的生命周期走到終點,輸出確定的計算結果;而在Storm架構中,Topology代表的並不是確定的作業,而是持續的計算過程,在確定的業務邏輯處理框架下,輸入數據源源不斷地進入系統,經過流式處理后以較低的延遲產生輸出。如果不主動結束這個Topology或者關閉Storm集群,那么數據處理的過程就會持續地進行下去。
- 通過以上的分析,我們可以看到,storm架構是如何解決Hadoop架構瓶頸的:
- Storm的Topology只需初始化一次。在將Topology提交到Storm集群的時候,集群會針對該Topology做一次初始化的工作,此后,在Topology運行過程中,對於輸入數據而言,是沒有計算框架初始化耗時的,有效避免了計算框架初始化的時間損耗。
- Storm使用Netty作為底層的消息隊列來傳遞消息,保證消息能夠得到快速的處理,同時Storm采用內存計算模式,無需借助文件存儲,直接通過網絡直傳中間計算結果,避免了組件之間傳輸數據的大量時間損耗。
3.3 storm的優點
- Storm 實現的一些特征決定了它的性能和可靠性的,Storm 使用 Netty 傳送消息,這就消除了中間的排隊過程,使得消息能夠直接在任務自身之間流動,在消息的背后,是一種用於序列化和反序列化 Storm 的原語類型的自動化且高效的機制。
-
Storm 的一個最有趣的地方是它注重容錯和管理,Storm 實現了有保障的消息處理,所以每個元組(Turple)都會通過該拓撲(Topology)結構進行全面處理;如果發現一個元組還未處理,它會自動從Spout處重發,Storm 還實現了任務級的故障檢測,在一個任務發生故障時,消息會自動重新分配以快速重新開始處理。Storm 包含比 Hadoop 更智能的處理管理,流程會由zookeeper來進行管理,以確保資源得到充分使用。
- 總結一下,有以下優點:
- 簡單編程,在大數據處理方面相信大家對hadoop已經耳熟能詳,基於Google Map/Reduce來實現的Hadoop為開發者提供了map、reduce原語,使並行批處理程序變得非常地簡單和優美。同樣,Storm也為大數據的實時計算提供了一些簡單優美的原語,這大大降低了開發並行實時處理的任務的復雜性,幫助你快速、高效的開發應用。
- 多語言支持,除了用java實現spout和bolt,你還可以使用任何你熟悉的編程語言來完成這項工作,這一切得益於Storm所謂的多語言協議。多語言協議是Storm內部的一種特殊協議,允許spout或者bolt使用標准輸入和標准輸出來進行消息傳遞,傳遞的消息為單行文本或者是json編碼的多行。
- 支持水平擴展,在Storm集群中真正運行topology的主要有三個實體:工作進程、線程和任務。Storm集群中的每台機器上都可以運行多個工作進程,每個工作進程又可創建多個線程,每個線程可以執行多個任務,任務是真正進行數據處理的實體,我們開發的spout、bolt就是作為一個或者多個任務的方式執行的。因此,計算任務在多個線程,進程和服務器之間並行進行,支持靈活的水平擴展。
- 容錯性強,如果在消息處理過程中出了一些異常,Storm會重新安排這個出問題的處理單元,Storm保證一個處理單元永遠運行(除非你顯式殺掉這個處理單元)。
- 可靠性的消息保證 Storm可以保證spout發出的每條消息都能被“完全處理”。
- 快速的消息處理,用Netty作為底層消息隊列, 保證消息能快速被處理。
- 本地模式,支持快速編程測試。
4 其他大數據解決方案
- 自 Google 在 2004 年推出 MapReduce 范式以來,已誕生了多個使用原始 MapReduce 范式(或擁有該范式的質量)的解決方案。Google 對 MapReduce 的最初應用是建立萬維網的索引。盡管此應用程序仍然很流行,但這個簡單模型解決的問題也正在增多。
- 下標中提供了一個可用開源大數據解決方案的列表,包括傳統的批處理和流式處理應用程序。在將 Storm 引入開源之前將近一年的時間里,Yahoo! 的 S4 分布式流計算平台已向 Apache 開源。S4 於 2010 年 10 月發布,它提供了一個高性能計算 (HPC) 平台,向應用程序開發人員隱藏了並行處理的復雜性。S4 實現了一個可擴展的、分散化的集群架構,並納入了部分容錯功能。
解決方案 | 開發商 | 類型 | 描述 |
---|---|---|---|
storm | 流式處理 | Twitter 的新流式大數據分析解決方案 | |
S4 | Yahoo! | 流式處理 | 來自 Yahoo! 的分布式流計算平台 |
Hadoop | Apache | 批處理 | MapReduce 范式的第一個開源實現 |
Spark | UC Berkeley AMPLab | 批處理 | 支持內存中數據集和恢復能力的最新分析平台 |
Disco | Nokia | 批處理 | Nokia 的分布式 MapReduce 框架 |
HPCC | LexisNexis | 批處理 | HPC 大數據集群 |
5 storm基本概念
-
下面介紹Storm的基本概念和數據流模型。Storm是一個開源的實時計算系統,它提供了一系列的基本元素用於進行計算:Topology、Stream、Spout、Bolt等等。
-
Storm集群和Hadoop集群表面上看很類似。但是Hadoop上運行的是MapReduce jobs,而在Storm上運行的是拓撲(topology),這兩者之間是非常不一樣的,一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會運行(除非你手動kill掉)。
-
在Storm的集群里面有兩種節點: 控制節點(master node)和工作節點(worker node)。控制節點上面運行一個叫Nimbus后台程序,它的作用類似Hadoop里面的JobTracker,Nimbus負責在集群里面分發代碼,分配計算任務給機器,並且監控狀態。每一個工作節點上面運行一個叫做Supervisor的進程。Supervisor會監聽分配給它那台機器的工作,根據需要啟動/關閉工作進程worker。每一個工作進程執行一個topology的一個子集;一個運行的topology由運行在很多機器上的很多工作進程worker組成。(一個supervisor里面有多個workder,一個worker是一個JVM。可以配置worker的數量,對應的是conf/storm.yaml中的supervisor.slot的數量)
- Nimbus和Supervisor之間的所有協調工作都是通過Zookeeper集群完成。另外,Nimbus進程和Supervisor進程都是快速失敗(fail-fast)和無狀態的。所有的狀態要么在zookeeper里面, 要么在本地磁盤上。這也就意味着你可以用kill -9來殺死Nimbus和Supervisor進程,然后再重啟它們,就好像什么都沒有發生過,這個設計使得Storm異常的穩定。
5.1 Topology
- 在Storm中,一個實時應用的計算任務被打包作為Topology發布,這同Hadoop的MapReduce任務相似。但是有一點不同的是:在Hadoop中,MapReduce任務最終會執行完成后結束;而在Storm中,Topology任務一旦提交后永遠不會結束,除非你顯示去停止任務。計算任務Topology是由不同的Spouts和Bolts,通過數據流(Stream)連接起來的圖。下面是一個Topology的結構示意圖:
- 其中包含有:
- Spout:Storm中的消息源,用於為Topology生產消息(數據),一般是從外部數據源(如Message Queue、RDBMS、NoSQL、Realtime Log )不間斷地讀取數據並發送給Topology消息(tuple元組)。
- Bolt:Storm中的消息處理者,用於為Topology進行消息的處理,Bolt可以執行過濾,聚合, 查詢數據庫等操作,而且可以一級一級的進行處理。
-
下圖是Storm的數據交互圖,可以看出兩個模塊Nimbus和Supervisor之間沒有直接交互。狀態都是保存在Zookeeper上,Worker之間通過Netty傳送數據。Storm與Zookeeper之間的交互過程,暫時不細說了。重要的一點:storm所有的元數據信息保存在Zookeeper中!
5.2 數據模型Turple
-
storm使用tuple來作為它的數據模型。每個tuple是一堆值,每個值有一個名字,並且每個值可以是任何類型,在我的理解里面一個tuple可以看作一個java對象。總體來看,storm支持所有的基本類型:字符串以及字節數組作為tuple的值類型。你也可以使用你自己定義的類型來作為值類型,只要你實現對應的序列化器(serializer)。
-
一個Tuple代表數據流中的一個基本的處理單元,它可以包含多個Field,每個Field表示一個屬性。比如舉例一個,三個字段(taskID:int; StreamID:String; ValueList: List):
-
Tuple是一個Key-Value的Map,由於各個組件間傳遞的tuple的字段名稱已經事先定義好了,所以Tuple只需要按序填入各個Value,所以就是一個Value List。一個沒有邊界的,源源不斷的,連續的Tuple序列就組成了Stream。
- topology里面的每個節點必須定義它要發射的tuple的每個字段。
5.3 worker(進程)
- 一個topology可能會在一個或者多個worker(工作進程)里面執行,每個worker是一個物理JVM並且執行整個topology的一部分。比如,對於並行度是300的topology來說,如果我們使用50個工作進程worker來執行,那么每個工作進程會處理其中的6個tasks。Storm會盡量均勻的工作分配給所有的worker,setBolt 的最后一個參數是你想為bolts的並行量。
5.4 Spouts
-
消息源spout是Storm里面一個topology里面的消息生產者。一般來說消息源會從一個外部源讀取數據並且向topology里面發出消息:tuple。Spout可以是可靠的也可以是不可靠的,如果這個tuple沒有被storm成功處理,可靠的消息源spouts可以重新發射一個tuple,但是不可靠的消息源spouts一旦發出一個tuple就不能重發了。
-
消息源可以發射多條消息流stream。使用OutputFieldsDeclarer。declareStream來定義多個stream,然后使用SpoutOutputCollector來發射指定的stream。代碼上是這樣的:collector.emit(new Values(str));
-
Spout類里面最重要的方法是nextTuple。要么發射一個新的tuple到topology里面或者簡單的返回如果已經沒有新的tuple。要注意的是nextTuple方法不能阻塞,因為storm在同一個線程上面調用所有消息源spout的方法。另外兩個比較重要的spout方法是ack和fail。storm在檢測到一個tuple被整個topology成功處理的時候調用ack,否則調用fail。storm只對可靠的spout調用ack和fail。
5.5 Bolts
-
所有的消息處理邏輯被封裝在bolts里面。Bolts可以做很多事情:過濾,聚合,查詢數據庫等等。
-
Bolts可以簡單的做消息流的傳遞(來一個元組,調用一次execute)。復雜的消息流處理往往需要很多步驟,從而也就需要經過很多bolts。比如算出一堆圖片里面被轉發最多的圖片就至少需要兩步:第一步算出每個圖片的轉發數量,第二步找出轉發最多的前10個圖片。(如果要把這個過程做得更具有擴展性那么可能需要更多的步驟)。
-
Bolts可以發射多條消息流, 使用OutputFieldsDeclarer.declareStream定義stream,使用OutputCollector.emit來選擇要發射的stream。
-
Bolts的主要方法是execute,它以一個tuple作為輸入,bolts使用OutputCollector來發射tuple(spout使用SpoutOutputCollector來發射指定的stream),bolts必須要為它處理的每一個tuple調用OutputCollector的ack方法,以通知Storm這個tuple被處理完成了,從而通知這個tuple的發射者spouts。一般的流程是: bolts處理一個輸入tuple, 發射0個或者多個tuple, 然后調用ack通知storm自己已經處理過這個tuple了。storm提供了一個IBasicBolt會自動調用ack。
5.6 Reliability
- Storm保證每個tuple會被topology完整的執行。Storm會追蹤由每個spout tuple所產生的tuple樹(一個bolt處理一個tuple之后可能會發射別的tuple從而形成樹狀結構),並且跟蹤這棵tuple樹什么時候成功處理完。每個topology都有一個消息超時的設置,如果storm在這個超時的時間內檢測不到某個tuple樹到底有沒有執行成功,那么topology會把這個tuple標記為執行失敗,並且過一會兒重新發射這個tuple(超時的時間在storm0.9.0.1版本中是可以設置的,默認是30s)。
5.7 Tasks
- 每一個spout和bolt會被當作很多task在整個集群里執行。每一個executor對應到一個線程,在這個線程上運行多個task,而stream grouping則是定義怎么從一堆task發射tuple到另外一堆task。你可以調用TopologyBuilder類的setSpout和setBolt來設置並行度。SetSpout里面的並行度參數含義:parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somwehere around the cluster。(執行這個spout安排了N個tasks。每個task是一個線程,他們都在同一個進程中。)setBolt的參數含義也是一樣的。
6 Storm數據流模型
- 數據流(Stream)是Storm中對數據進行的抽象,它是時間上無界的tuple元組序列。在Topology中,Spout是Stream的源頭。負責為Topology從特定數據源發射Stream;Bolt可以接收任意多個Stream作為輸入,然后進行數據的加工處理過程,如果需要,Bolt還可以發射出新的Stream給下級Bolt進行處理。下面是一個Topology內部Spout和Bolt之間的數據流關系:
- Topology中每一個計算組件(Spout和Bolt)都有一個並行執行度,在創建Topology時可以進行指定,Storm會在集群內分配對應並行度個數的線程來同時執行這一組件。那么,有一個問題:既然對於一個Spout或Bolt,都會有多個task線程來運行,那么如何在兩個組件(Spout和Bolt)之間發送tuple元組呢?Storm提供了若干種數據流分發(Stream Grouping)策略用來解決這一問題。在Topology定義時,需要為每個Bolt指定接收什么樣的Stream作為其輸入(注:Spout並不需要接收Stream,只會發射Stream)。目前Storm中提供了以下7種Stream Grouping策略:Shuffle Grouping、Fields Grouping、All Grouping、Global Grouping、Non Grouping、Direct Grouping、Local or shuffle grouping。
6.1 Stream groupings
- Storm里面有7種類型的stream grouping
- Shuffle Grouping: 隨機分組, 隨機派發stream里面的tuple,保證每個bolt接收到的tuple數目大致相同。
- Fields Grouping:按字段分組,比如按userid來分組,具有同樣userid的tuple會被分到相同的Bolts里的一個task。而不同的userid則會被分配到不同的bolts里的task。
- All Grouping:廣播發送,對於每一個tuple,所有的bolts都會收到。
- Global Grouping:全局分組, 這個tuple被分配到storm中的一個bolt的其中一個task,再具體一點就是分配給id值最低的那個task。
- Non Grouping:不分組,這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果。有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程里面去執行。
- Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪個task處理這個消息。。只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。消息處理者可以通過TopologyContext來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id)。
- Local or shuffle grouping:如果目標bolt有一個或者多個task在同一個工作進程worker中,tuple將會被隨機發生給這些tasks。否則,和普通的Shuffle Grouping行為一致。
6.2 storm 記錄級容錯
-
相比於s4, puma等其他實時計算系統,storm最大的亮點在於其記錄級容錯和能夠保證消息精確處理的事務功能,下面就重點來看一下這兩個亮點的實現原理。
-
Storm記錄級容錯的基本原理。首先來看一下什么叫做記錄級容錯?storm允許用戶在spout中發射一個新的源tuple時為其指定一個message id,這個message id可以是任意的object對象。多個源tuple可以共用一個message id,表示這多個源 tuple對用戶來說是同一個消息單元。storm中記錄級容錯的意思是說,storm會告知用戶每一個消息單元是否在指定時間內被完全處理了。那什么叫做完全處理呢,就是該message id綁定的源tuple及由該源tuple后續生成的tuple經過了topology中每一個應該到達的bolt的處理。舉個例子。如下圖,在spout由message 1綁定的tuple1和tuple2經過了bolt1和bolt2的處理生成兩個新的tuple,並最終都流向了bolt3。當這個過程完成處理完時,稱message 1被完全處理了。
- 在storm的topology中有一個系統級組件,叫做acker。這個acker的任務就是追蹤從spout中流出來的每一個message id綁定的若干tuple的處理路徑,如果在用戶設置的最大超時時間內這些tuple沒有被完全處理,那么acker就會告知spout該消息處理失敗了,相反則會告知spout該消息處理成功了。在剛才的描述中,我們提到了”記錄tuple的處理路徑”,如果曾經嘗試過這么做的同學可以仔細地思考一下這件事的復雜程度。但是storm中卻是使用了一種非常巧妙的方法做到了。在說明這個方法之前,我們來復習一個數學定理。
A xor A = 0.
A xor B…xor B xor A = 0,其中每一個操作數出現且僅出現兩次。 - storm中使用的巧妙方法就是基於這個定理。具體過程是這樣的:在spout中系統會為用戶指定的message id生成一個對應的64位整數,作為一個root id。root id會傳遞給acker及后續的bolt作為該消息單元的唯一標識。同時無論是spout還是bolt每次新生成一個tuple的時候,都會賦予該tuple一個64位的整數的id。Spout發射完某個message id對應的源tuple之后,會告知acker自己發射的root id及生成的那些源tuple的id。而bolt呢,每次接受到一個輸入tuple處理完之后,也會告知acker自己處理的輸入tuple的id及新生成的那些tuple的id。Acker只需要對這些id做一個簡單的異或運算,就能判斷出該root id對應的消息單元是否處理完成了。下面通過一個圖示來說明這個過程。
- 第1步:初始化 spout中綁定message 1生成了兩個源tuple,id分別是0010和1011。
- 第2步:計算一個turple達到第1個bolt。bolt1處理tuple 0010時生成了一個新的tuple,id為0110。
- 第3步:計算一個turple達到第2個bolt,bolt2處理tuple 1011時生成了一個新的tuple,id為0111。
- 第4步:消息到達最后一個bolt。
- 第1步:初始化 spout中綁定message 1生成了兩個源tuple,id分別是0010和1011。
- 可能有些細心的同學會發現,容錯過程存在一個可能出錯的地方,那就是,如果生成的tuple id並不是完全各異的,acker可能會在消息單元完全處理完成之前就錯誤的計算為0。這個錯誤在理論上的確是存在的,但是在實際中其概率是極低極低的,完全可以忽略。
6.3 Storm的事務拓撲
- 事務拓撲(transactional topology)是storm0.7引入的特性,在0.8版本以后的版本中已經被封裝為Trident,提供了更加便利和直觀的接口。因為篇幅所限,在此對事務拓撲做一個簡單的介紹。
- 事務拓撲的目的是為了滿足對消息處理有着極其嚴格要求的場景,例如實時計算某個用戶的成交筆數,要求結果完全精確,不能多也不能少。Storm的事務拓撲是完全基於它底層的spout/bolt/acker原語實現的。通過一層巧妙的封裝得出一個優雅的實現。
- 事務拓撲簡單來說就是將消息分為一個個的批(batch),同一批內的消息以及批與批之間的消息可以並行處理,另一方面,用戶可以設置某些bolt為committer,storm可以保證committer的finishBatch()操作是按嚴格不降序的順序執行的。用戶可以利用這個特性通過簡單的編程技巧實現消息處理的精確。
7 並法度的理解
7.1 Example of a running topology
- The following illustration shows how a simple topology would look like in operation. The topology consists of three components: one spout called BlueSpout and two bolts called GreenBolt and YellowBolt. The components are linked such that BlueSpout sends its output to GreenBolt, which in turns sends its own output to YellowBolt.
- 總結:
- 一個Topology可以包含多個worker ,一個worker只能對應於一個topology。worker process是一個topology的子集。
- 一個worker可以包含多個executor,一個executor只能對應於一個component(spout或者bolt)。
- Task就是具體的處理邏輯,一個executor線程可以執行一個或多個tasks。線程就是資源,task就是要運行的任務。
7.2 並發度的配置有效的順序
- Storm currently has the following order of precedence for configuration settings:
defaults.yaml < storm.yaml < topology-specific configuration < internal component-specific configuration < external component-specific configuration。